You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2013/07/23 18:55:10 UTC

[45/50] git commit: ACCUMULO-1000 fixed a lot of odds and ends

ACCUMULO-1000 fixed a lot of odds and ends


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/5e908585
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/5e908585
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/5e908585

Branch: refs/heads/ACCUMULO-1000
Commit: 5e908585aa76b605840c87e2f769e2aff642b3a6
Parents: 7bb5f8f
Author: Keith Turner <kt...@apache.org>
Authored: Mon Jul 22 16:37:03 2013 -0400
Committer: Keith Turner <kt...@apache.org>
Committed: Mon Jul 22 16:37:03 2013 -0400

----------------------------------------------------------------------
 .../core/client/impl/ConditionalWriterImpl.java |  43 +++--
 .../iterators/system/ColumnQualifierFilter.java |   5 +-
 .../accumulo/server/tabletserver/Tablet.java    |   8 +-
 .../server/tabletserver/TabletServer.java       | 173 ++++++++++---------
 4 files changed, 123 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/5e908585/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
index c87c865..ed20054 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
@@ -65,6 +65,7 @@ import org.apache.accumulo.core.util.BadArgumentException;
 import org.apache.accumulo.core.util.ByteBufferUtil;
 import org.apache.accumulo.core.util.LoggingRunnable;
 import org.apache.accumulo.core.util.ThriftUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.trace.instrument.Tracer;
 import org.apache.accumulo.trace.thrift.TInfo;
 import org.apache.commons.collections.map.LRUMap;
@@ -81,6 +82,10 @@ class ConditionalWriterImpl implements ConditionalWriter {
   
   private static final Logger log = Logger.getLogger(ConditionalWriterImpl.class);
 
+  private static final int MAX_SLEEP = 5000;
+
+  private static final long SESSION_CACHE_TIME = 60000;
+
   private Authorizations auths;
   private VisibilityEvaluator ve;
   @SuppressWarnings("unchecked")
@@ -167,7 +172,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
     
     void resetDelay() {
       // TODO eventually timeout a mutation
-      delay = Math.min(delay * 2, 5000);
+      delay = Math.min(delay * 2, MAX_SLEEP);
       resetTime = System.currentTimeMillis();
     }
   }
@@ -231,7 +236,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
     
     synchronized (serverQueue) {
       serverQueue.queue.add(mutations);
-      //never execute more that one task per server
+      // never execute more than one task per server
       if(!serverQueue.taskQueued){
         threadPool.execute(new LoggingRunnable(log, new SendTask(location)));
         serverQueue.taskQueued = true;
@@ -357,12 +362,13 @@ class ConditionalWriterImpl implements ConditionalWriter {
     
     @Override
     public void run() {
-      TabletServerMutations<QCMutation> mutations = dequeue(location);
-      if (mutations != null)
-        sendToServer(location, mutations);
-      
-      //TODO if exception is thrown, will not reschedule
-      reschedule(this);
+      try {
+        TabletServerMutations<QCMutation> mutations = dequeue(location);
+        if (mutations != null)
+          sendToServer(location, mutations);
+      } finally {
+        reschedule(this);
+      }
     }
   }
   
@@ -380,6 +386,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
   private static class SessionID {
     long sessionID;
     boolean reserved;
+    long lastAccessTime;
   }
   
   private HashMap<String, SessionID> cachedSessionIDs = new HashMap<String, SessionID>();
@@ -392,8 +399,12 @@ class ConditionalWriterImpl implements ConditionalWriter {
         if (sid.reserved)
           throw new IllegalStateException();
         
-        sid.reserved = true;
-        return sid.sessionID;
+        if (System.currentTimeMillis() - sid.lastAccessTime > SESSION_CACHE_TIME) {
+          cachedSessionIDs.remove(location);
+        } else {
+          sid.reserved = true;
+          return sid.sessionID;
+        }
       }
     }
     
@@ -423,6 +434,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
       if(!sid.reserved)
         throw new IllegalStateException();
       sid.reserved = false;
+      sid.lastAccessTime = System.currentTimeMillis();
     }
   }
   
@@ -470,9 +482,6 @@ class ConditionalWriterImpl implements ConditionalWriter {
         }
       }
 
-
-      // TODO maybe have thrift call return bad extents
-
       for (KeyExtent ke : extentsToInvalidate) {
         locator.invalidateCache(ke);
       }
@@ -533,14 +542,14 @@ class ConditionalWriterImpl implements ConditionalWriter {
    */
   private void invalidateSession(long sessionId, String location, TabletServerMutations<QCMutation> mutations) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
     
-    // TODO could assume tserver will invalidate sessions after a given time period
-    
     ArrayList<QCMutation> mutList = new ArrayList<QCMutation>();
     
     for (List<QCMutation> tml : mutations.getMutations().values()) {
       mutList.addAll(tml);
     }
     
+    long sleepTime = 50;
+
     while (true) {
       Map<String,TabletServerMutations<QCMutation>> binnedMutations = new HashMap<String,TabletLocator.TabletServerMutations<QCMutation>>();
       List<QCMutation> failures = new ArrayList<QCMutation>();
@@ -565,7 +574,9 @@ class ConditionalWriterImpl implements ConditionalWriter {
         locator.invalidateCache(location);
       }
       
-      //TODO sleep
+      UtilWaitThread.sleep(sleepTime);
+      sleepTime = Math.min(2 * sleepTime, MAX_SLEEP);
+
     }
 	
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/5e908585/core/src/main/java/org/apache/accumulo/core/iterators/system/ColumnQualifierFilter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/system/ColumnQualifierFilter.java b/core/src/main/java/org/apache/accumulo/core/iterators/system/ColumnQualifierFilter.java
index 1595f5a..d5ca3b4 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/system/ColumnQualifierFilter.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/system/ColumnQualifierFilter.java
@@ -19,6 +19,7 @@ package org.apache.accumulo.core.iterators.system;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.Set;
 
 import org.apache.accumulo.core.data.ArrayByteSequence;
 import org.apache.accumulo.core.data.ByteSequence;
@@ -36,7 +37,7 @@ public class ColumnQualifierFilter extends Filter {
   
   public ColumnQualifierFilter() {}
   
-  public ColumnQualifierFilter(SortedKeyValueIterator<Key,Value> iterator, HashSet<Column> columns) {
+  public ColumnQualifierFilter(SortedKeyValueIterator<Key,Value> iterator, Set<Column> columns) {
     setSource(iterator);
     init(columns);
   }
@@ -63,7 +64,7 @@ public class ColumnQualifierFilter extends Filter {
     return cfset != null && cfset.contains(key.getColumnFamilyData());
   }
   
-  public void init(HashSet<Column> columns) {
+  public void init(Set<Column> columns) {
     this.columnFamilies = new HashSet<ByteSequence>();
     this.columnsQualifiers = new HashMap<ByteSequence,HashSet<ByteSequence>>();
     

http://git-wip-us.apache.org/repos/asf/accumulo/blob/5e908585/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
index 1305be6..035d9b0 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
@@ -1651,7 +1651,7 @@ public class Tablet {
     }
   }
   
-  private Batch nextBatch(SortedKeyValueIterator<Key,Value> iter, Range range, int num, HashSet<Column> columns) throws IOException {
+  private Batch nextBatch(SortedKeyValueIterator<Key,Value> iter, Range range, int num, Set<Column> columns) throws IOException {
     
     // log.info("In nextBatch..");
     
@@ -1739,7 +1739,7 @@ public class Tablet {
     public long numBytes;
   }
   
-  Scanner createScanner(Range range, int num, HashSet<Column> columns, Authorizations authorizations, List<IterInfo> ssiList,
+  Scanner createScanner(Range range, int num, Set<Column> columns, Authorizations authorizations, List<IterInfo> ssiList,
       Map<String,Map<String,String>> ssio, boolean isolated, AtomicBoolean interruptFlag) {
     // do a test to see if this range falls within the tablet, if it does not
     // then clip will throw an exception
@@ -1873,14 +1873,14 @@ public class Tablet {
     // scan options
     Authorizations authorizations;
     byte[] defaultLabels;
-    HashSet<Column> columnSet;
+    Set<Column> columnSet;
     List<IterInfo> ssiList;
     Map<String,Map<String,String>> ssio;
     AtomicBoolean interruptFlag;
     int num;
     boolean isolated;
     
-    ScanOptions(int num, Authorizations authorizations, byte[] defaultLabels, HashSet<Column> columnSet, List<IterInfo> ssiList,
+    ScanOptions(int num, Authorizations authorizations, byte[] defaultLabels, Set<Column> columnSet, List<IterInfo> ssiList,
         Map<String,Map<String,String>> ssio, AtomicBoolean interruptFlag, boolean isolated) {
       this.num = num;
       this.authorizations = authorizations;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/5e908585/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
index 013639e..8f33488 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
@@ -750,6 +750,12 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
     public TCredentials credentials;
     public Authorizations auths;
     public String tableId;
+    public AtomicBoolean interruptFlag;
+    
+    @Override
+    public void cleanup() {
+      interruptFlag.set(true);
+    }
   }
   
   private static class UpdateSession extends Session {
@@ -901,6 +907,8 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
     
     WriteTracker writeTracker = new WriteTracker();
     
+    private RowLocks rowLocks = new RowLocks();
+
     ThriftClientHandler() {
       super(instance, watcher);
       log.debug(ThriftClientHandler.class.getName() + " created");
@@ -1730,16 +1738,11 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
         writeTracker.finishWrite(opid);
       }
     }
-    
-    private RowLocks rowLocks = new RowLocks();
 
-    private void checkConditions(Map<KeyExtent,List<ServerConditionalMutation>> updates, ArrayList<TCMResult> results, Authorizations authorizations,
-        List<String> symbols) {
+    private void checkConditions(Map<KeyExtent,List<ServerConditionalMutation>> updates, ArrayList<TCMResult> results, ConditionalSession cs,
+        List<String> symbols) throws IOException {
       Iterator<Entry<KeyExtent,List<ServerConditionalMutation>>> iter = updates.entrySet().iterator();
       
-      // TODO use constant
-      HashSet<Column> columns = new HashSet<Column>();
-
       CompressedIterators compressedIters = new CompressedIterators(symbols);
 
       while (iter.hasNext()) {
@@ -1752,97 +1755,91 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
           iter.remove();
         } else {
           List<ServerConditionalMutation> okMutations = new ArrayList<ServerConditionalMutation>(entry.getValue().size());
-          
-          // TODO extract to method
-          for (ServerConditionalMutation scm : entry.getValue()) {
-            boolean add = true;
-            for(TCondition tc : scm.getConditions()){
-            
-              Range range;
-              if (tc.hasTimestamp)
-                range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new Text(tc.getCq()), new Text(tc.getCv()), tc.getTs());
-              else
-                range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new Text(tc.getCq()), new Text(tc.getCv()));
-              
-              AtomicBoolean interruptFlag = new AtomicBoolean();
-
-              IterConfig ic = compressedIters.decompress(tc.iterators);
 
-              //TODO use one iterator per tablet, push checks into tablet?
-              Scanner scanner = tablet.createScanner(range, 1, columns, authorizations, ic.ssiList, ic.ssio, false, interruptFlag);
-              
-              try {
-                ScanBatch batch = scanner.read();
-                
-                Value val = null;
-                
-                for (KVEntry entry2 : batch.results) {
-                  val = entry2.getValue();
-                  break;
-                }
-                
-                if ((val == null ^ tc.getVal() == null) || (val != null && !Arrays.equals(tc.getVal(), val.get()))) {
-                  results.add(new TCMResult(scm.getID(), TCMStatus.REJECTED));
-                  add = false;
-                  break;
-                }
-                
-              } catch (TabletClosedException e) {
-                // TODO ignore rest of tablets mutations
-                results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
-                add = false;
-                break;
-              } catch (IterationInterruptedException iie) {
-                // TODO determine why this happened, ignore rest of tablets mutations?
-                results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
-                add = false;
-                break;
-              } catch (TooManyFilesException tmfe) {
-                // TODO handle differently?
-                results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
-                add = false;
-                break;
-              } catch (IOException e) {
-                // TODO Auto-generated catch block
-                e.printStackTrace();
-              } finally {
-                scanner.close();
-              }
-            }
-            
-            if (add)
+          for (ServerConditionalMutation scm : entry.getValue()) {
+            if (checkCondition(results, cs, compressedIters, tablet, scm))
               okMutations.add(scm);
           }
           
-          // TODO just rebuild map
-          entry.getValue().clear();
-          entry.getValue().addAll(okMutations);
+          entry.setValue(okMutations);
         }
         
       }
     }
 
-    private void writeConditionalMutations(Map<KeyExtent,List<ServerConditionalMutation>> updates, ArrayList<TCMResult> results, TCredentials credentials) {
+    boolean checkCondition(ArrayList<TCMResult> results, ConditionalSession cs, CompressedIterators compressedIters,
+        Tablet tablet, ServerConditionalMutation scm) throws IOException {
+      boolean add = true;
+      
+      Set<Column> emptyCols = Collections.emptySet();
+
+      for(TCondition tc : scm.getConditions()){
+      
+        Range range;
+        if (tc.hasTimestamp)
+          range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new Text(tc.getCq()), new Text(tc.getCv()), tc.getTs());
+        else
+          range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new Text(tc.getCq()), new Text(tc.getCv()));
+        
+        IterConfig ic = compressedIters.decompress(tc.iterators);
+
+        //TODO use one iterator per tablet, push checks into tablet?
+        Scanner scanner = tablet.createScanner(range, 1, emptyCols, cs.auths, ic.ssiList, ic.ssio, false, cs.interruptFlag);
+        
+        try {
+          ScanBatch batch = scanner.read();
+          
+          Value val = null;
+          
+          for (KVEntry entry2 : batch.results) {
+            val = entry2.getValue();
+            break;
+          }
+          
+          if ((val == null ^ tc.getVal() == null) || (val != null && !Arrays.equals(tc.getVal(), val.get()))) {
+            results.add(new TCMResult(scm.getID(), TCMStatus.REJECTED));
+            add = false;
+            break;
+          }
+          
+        } catch (TabletClosedException e) {
+          results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+          add = false;
+          break;
+        } catch (IterationInterruptedException iie) {
+          results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+          add = false;
+          break;
+        } catch (TooManyFilesException tmfe) {
+          results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+          add = false;
+          break;
+        }
+      }
+      return add;
+    }
+
+    private void writeConditionalMutations(Map<KeyExtent,List<ServerConditionalMutation>> updates, ArrayList<TCMResult> results, ConditionalSession sess) {
       Set<Entry<KeyExtent,List<ServerConditionalMutation>>> es = updates.entrySet();
       
       Map<CommitSession,List<Mutation>> sendables = new HashMap<CommitSession,List<Mutation>>();
 
       // TODO stats
 
+      boolean sessionCanceled = sess.interruptFlag.get();
+
       for (Entry<KeyExtent,List<ServerConditionalMutation>> entry : es) {
         Tablet tablet = onlineTablets.get(entry.getKey());
-        if (tablet == null || tablet.isClosed()) {
+        if (tablet == null || tablet.isClosed() || sessionCanceled) {
           for (ServerConditionalMutation scm : entry.getValue())
             results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
         } else {
-          // TODO write tracker
-          
           try {
             
             List<Mutation> mutations = (List<Mutation>) (List<? extends Mutation>) entry.getValue();
             if (mutations.size() > 0) {
 
-              CommitSession cs = tablet.prepareMutationsForCommit(new TservConstraintEnv(security, credentials), mutations);
+              CommitSession cs = tablet.prepareMutationsForCommit(new TservConstraintEnv(security, sess.credentials), mutations);
               
               if (cs == null) {
                 for (ServerConditionalMutation scm : entry.getValue())
@@ -1889,8 +1886,8 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
 
     }
 
-    private Map<KeyExtent,List<ServerConditionalMutation>> conditionalUpdate(TCredentials credentials, Authorizations authorizations,
-        Map<KeyExtent,List<ServerConditionalMutation>> updates, ArrayList<TCMResult> results, List<String> symbols) {
+    private Map<KeyExtent,List<ServerConditionalMutation>> conditionalUpdate(ConditionalSession cs, Map<KeyExtent,List<ServerConditionalMutation>> updates,
+        ArrayList<TCMResult> results, List<String> symbols) throws IOException {
       // sort each list of mutations, this is done to avoid deadlock and doing seeks in order is more efficient and detect duplicate rows.
       ConditionalMutationSet.sortConditionalMutations(updates);
       
@@ -1902,8 +1899,8 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
       // get as many locks as possible w/o blocking... defer any rows that are locked
       List<RowLock> locks = rowLocks.acquireRowlocks(updates, deferred);
       try {
-        checkConditions(updates, results, authorizations, symbols);
-        writeConditionalMutations(updates, results, credentials);
+        checkConditions(updates, results, cs, symbols);
+        writeConditionalMutations(updates, results, cs);
       } finally {
         rowLocks.releaseRowLocks(locks);
       }
@@ -1926,6 +1923,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
       cs.auths = new Authorizations(authorizations);
       cs.credentials = credentials;
       cs.tableId = tableID;
+      cs.interruptFlag = new AtomicBoolean();
       
       return sessionManager.createSession(cs, false);
     }
@@ -1934,34 +1932,36 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
     public List<TCMResult> conditionalUpdate(TInfo tinfo, long sessID, Map<TKeyExtent,List<TConditionalMutation>> mutations, List<String> symbols)
         throws NoSuchScanIDException, TException {
       // TODO sessions, should show up in list scans
-      // TODO timeout like scans do
       
       ConditionalSession cs = (ConditionalSession) sessionManager.reserveSession(sessID);
       
       if(cs == null)
         throw new NoSuchScanIDException();
       
-      
+      Text tid = new Text(cs.tableId);
+      long opid = writeTracker.startWrite(TabletType.type(new KeyExtent(tid, null, null)));
       
       try{
         Map<KeyExtent,List<ServerConditionalMutation>> updates = Translator.translate(mutations, Translator.TKET,
             new Translator.ListTranslator<TConditionalMutation,ServerConditionalMutation>(ServerConditionalMutation.TCMT));
-        
-        Text tid = new Text(cs.tableId);
+
         for(KeyExtent ke : updates.keySet())
           if(!ke.getTableId().equals(tid))
             throw new IllegalArgumentException("Unexpected table id "+tid+" != "+ke.getTableId());
         
         ArrayList<TCMResult> results = new ArrayList<TCMResult>();
         
-        Map<KeyExtent,List<ServerConditionalMutation>> deferred = conditionalUpdate(cs.credentials, cs.auths, updates, results, symbols);
+        Map<KeyExtent,List<ServerConditionalMutation>> deferred = conditionalUpdate(cs, updates, results, symbols);
   
         while (deferred.size() > 0) {
-          deferred = conditionalUpdate(cs.credentials, cs.auths, deferred, results, symbols);
+          deferred = conditionalUpdate(cs, deferred, results, symbols);
         }
   
         return results;
+      } catch (IOException ioe) {
+        throw new TException(ioe);
       }finally{
+        writeTracker.finishWrite(opid);
         sessionManager.unreserveSession(sessID);
       }
     }
@@ -1970,7 +1970,12 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
     public void invalidateConditionalUpdate(TInfo tinfo, long sessID) throws TException {
       //this method should wait for any running conditional update to complete
       //after this method returns a conditional update should not be able to start
-      ConditionalSession cs = (ConditionalSession) sessionManager.reserveSession(sessID, true);
+      
+      ConditionalSession cs = (ConditionalSession) sessionManager.getSession(sessID);
+      if (cs != null)
+        cs.interruptFlag.set(true);
+      
+      cs = (ConditionalSession) sessionManager.reserveSession(sessID, true);
       if(cs != null)
         sessionManager.removeSession(sessID, true);
     }