You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2013/07/23 18:54:51 UTC

[26/50] git commit: added ability to invalidate server side conditional update sessions

added ability to invalidate server side conditional update sessions


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/ec537137
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/ec537137
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/ec537137

Branch: refs/heads/ACCUMULO-1000
Commit: ec537137aa958cf87d8d11ff5fdfe05c78dea624
Parents: a169064
Author: keith@deenlo.com <ke...@deenlo.com>
Authored: Fri Jul 19 16:00:13 2013 -0400
Committer: keith@deenlo.com <ke...@deenlo.com>
Committed: Fri Jul 19 16:00:13 2013 -0400

----------------------------------------------------------------------
 .../core/client/impl/ConditionalWriterImpl.java |  120 +-
 .../thrift/TabletClientService.java             | 2831 +++++++++++++++---
 core/src/main/thrift/tabletserver.thrift        |    9 +-
 .../server/security/SecurityOperation.java      |   15 +-
 .../server/tabletserver/TabletServer.java       |   97 +-
 .../test/performance/thrift/NullTserver.java    |   16 +-
 .../accumulo/test/ConditionalWriterTest.java    |    1 +
 7 files changed, 2571 insertions(+), 518 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/ec537137/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
index f0d6108..31403fb 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
@@ -34,10 +34,12 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.ConditionalWriter;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.TableDeletedException;
+import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.TableOfflineException;
 import org.apache.accumulo.core.client.impl.TabletLocator.TabletServerMutations;
 import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
@@ -57,6 +59,7 @@ import org.apache.accumulo.core.security.ColumnVisibility;
 import org.apache.accumulo.core.security.VisibilityEvaluator;
 import org.apache.accumulo.core.security.VisibilityParseException;
 import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 import org.apache.accumulo.core.util.BadArgumentException;
 import org.apache.accumulo.core.util.ByteBufferUtil;
@@ -177,7 +180,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
     return queue;
   }
   
-  private void queueFailed(List<QCMutation> mutations) {
+  private void queueRetry(List<QCMutation> mutations) {
     for (QCMutation qcm : mutations) {
       qcm.resetDelay();
     }
@@ -208,7 +211,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
     }
     
     if (failures.size() > 0)
-      queueFailed(failures);
+      queueRetry(failures);
 
     for (Entry<String,TabletServerMutations<QCMutation>> entry : binnedMutations.entrySet()) {
       queue(entry.getKey(), entry.getValue());
@@ -350,6 +353,8 @@ class ConditionalWriterImpl implements ConditionalWriter {
     Map<Long,CMK> cmidToCm = new HashMap<Long,CMK>();
     MutableLong cmid = new MutableLong(0);
 
+    Long sessionId = null;
+    
     try {
       client = ThriftUtil.getTServerClient(location, instance.getConfiguration());
 
@@ -357,9 +362,11 @@ class ConditionalWriterImpl implements ConditionalWriter {
 
       CompressedIterators compressedIters = new CompressedIterators();
       convertMutations(mutations, cmidToCm, cmid, tmutations, compressedIters);
-
-      List<TCMResult> tresults = client.conditionalUpdate(tinfo, credentials, ByteBufferUtil.toByteBuffers(auths.getAuthorizations()), tmutations,
-          compressedIters.getSymbolTable());
+      
+      //TODO create a session per tserver and keep reusing it
+      sessionId = client.startConditionalUpdate(tinfo, credentials, ByteBufferUtil.toByteBuffers(auths.getAuthorizations()), tableId);
+      
+      List<TCMResult> tresults = client.conditionalUpdate(tinfo, sessionId, tmutations, compressedIters.getSymbolTable());
 
       HashSet<KeyExtent> extentsToInvalidate = new HashSet<KeyExtent>();
 
@@ -383,27 +390,108 @@ class ConditionalWriterImpl implements ConditionalWriter {
         locator.invalidateCache(ke);
       }
 
-      queueFailed(ignored);
+      queueRetry(ignored);
 
+    } catch (NoSuchScanIDException nssie){
+    	queueRetry(cmidToCm);
     } catch (ThriftSecurityException tse) {
       AccumuloSecurityException ase = new AccumuloSecurityException(credentials.getPrincipal(), tse.getCode(), Tables.getPrintableTableInfoFromId(instance,
           tableId), tse);
-      for (CMK cmk : cmidToCm.values())
-        cmk.cm.resultQueue.add(new Result(ase, cmk.cm, location));
+      queueException(location, cmidToCm, ase);
     } catch (TTransportException e) {
       locator.invalidateCache(location);
-      for (CMK cmk : cmidToCm.values())
-        cmk.cm.resultQueue.add(new Result(Status.UNKNOWN, cmk.cm, location));
+      invalidateSession(location, mutations, cmidToCm, sessionId);
     } catch (TApplicationException tae) {
-      for (CMK cmk : cmidToCm.values())
-        cmk.cm.resultQueue.add(new Result(new AccumuloServerException(location, tae), cmk.cm, location));
+      queueException(location, cmidToCm, new AccumuloServerException(location, tae));
     } catch (TException e) {
       locator.invalidateCache(location);
-      for (CMK cmk : cmidToCm.values())
-        cmk.cm.resultQueue.add(new Result(Status.UNKNOWN, cmk.cm, location));
+      invalidateSession(location, mutations, cmidToCm, sessionId);
     } catch (Exception e) {
-      for (CMK cmk : cmidToCm.values())
-        cmk.cm.resultQueue.add(new Result(e, cmk.cm, location));
+      queueException(location, cmidToCm, e);
+    } finally {
+      ThriftUtil.returnClient((TServiceClient) client);
+    }
+  }
+
+  private void queueRetry(Map<Long,CMK> cmidToCm) {
+    ArrayList<QCMutation> ignored = new ArrayList<QCMutation>();
+    for (CMK cmk : cmidToCm.values())
+    	ignored.add(cmk.cm);
+    queueRetry(ignored);
+  }
+
+  private void queueException(String location, Map<Long,CMK> cmidToCm, Exception e) {
+    for (CMK cmk : cmidToCm.values())
+      cmk.cm.resultQueue.add(new Result(e, cmk.cm, location));
+  }
+
+  private void invalidateSession(String location, TabletServerMutations<QCMutation> mutations, Map<Long,CMK> cmidToCm, Long sessionId) {
+    if(sessionId == null){
+      queueRetry(cmidToCm);
+    }else{
+      try {
+        invalidateSession(sessionId, location, mutations);
+        for (CMK cmk : cmidToCm.values())
+          cmk.cm.resultQueue.add(new Result(Status.UNKNOWN, cmk.cm, location));
+      }catch(Exception e2){
+        queueException(location, cmidToCm, e2);
+      }
+    }
+  }
+  
+  /*
+   * The purpose of this code is to ensure that a conditional mutation will not execute when its status is unknown. This allows a user to read the row when the
+   * status is unknown and not have to worry about the tserver applying the mutation after the scan.
+   * 
+   * If a conditional mutation is taking a long time to process, then this method will wait for it to finish... unless this exceeds timeout.
+   */
+  private void invalidateSession(long sessionId, String location, TabletServerMutations<QCMutation> mutations) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+    
+    // TODO could assume tserver will invalidate sessions after a given time period
+    
+    ArrayList<QCMutation> mutList = new ArrayList<QCMutation>();
+    
+    for (List<QCMutation> tml : mutations.getMutations().values()) {
+      mutList.addAll(tml);
+    }
+    
+    while (true) {
+      Map<String,TabletServerMutations<QCMutation>> binnedMutations = new HashMap<String,TabletLocator.TabletServerMutations<QCMutation>>();
+      List<QCMutation> failures = new ArrayList<QCMutation>();
+      
+      locator.binMutations(mutList, binnedMutations, failures, credentials);
+      
+      // TODO do failures matter? not if failures only indicates tablets are not assigned
+      
+      if (!binnedMutations.containsKey(location)) {
+        // the tablets are at different locations now, so there is no need to invalidate the session
+        // TODO could be a case where tablet comes back to tserver and then UNKNOW condMut goes through
+        return;
+      }
+      
+      try {
+        // if the mutation is currently processing, this method will block until its done or times out
+        invalidateSession(sessionId, location);
+        return;
+      } catch (TApplicationException tae) {
+        throw new AccumuloServerException(location, tae);
+      } catch (TException e) {
+        locator.invalidateCache(location);
+      }
+      
+      //TODO sleep
+    }
+	
+  }
+  
+  private void invalidateSession(long sessionId, String location) throws TException {
+    TabletClientService.Iface client = null;
+    
+    TInfo tinfo = Tracer.traceInfo();
+    
+    try {
+      client = ThriftUtil.getTServerClient(location, instance.getConfiguration());
+      client.invalidateConditionalUpdate(tinfo, sessionId);
     } finally {
       ThriftUtil.returnClient((TServiceClient) client);
     }