You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2013/07/23 18:54:49 UTC

[24/50] added ability to invalidate server side conditional update sessions

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ec537137/core/src/main/thrift/tabletserver.thrift
----------------------------------------------------------------------
diff --git a/core/src/main/thrift/tabletserver.thrift b/core/src/main/thrift/tabletserver.thrift
index 5dfcb4b..c6d442c 100644
--- a/core/src/main/thrift/tabletserver.thrift
+++ b/core/src/main/thrift/tabletserver.thrift
@@ -166,9 +166,14 @@ service TabletClientService extends client.ClientService {
     throws (1:client.ThriftSecurityException sec, 
             2:NotServingTabletException nste, 
             3:ConstraintViolationException cve),
-  
-  list<data.TCMResult> conditionalUpdate(1:trace.TInfo tinfo, 2:security.TCredentials credentials, 3:list<binary> authorizations, 4:data.CMBatch mutations, 5:list<string> symbols)
+
+  data.UpdateID startConditionalUpdate(1:trace.TInfo tinfo, 2:security.TCredentials credentials, 3:list<binary> authorizations, 4:string tableID)
      throws (1:client.ThriftSecurityException sec);
+  
+  list<data.TCMResult> conditionalUpdate(1:trace.TInfo tinfo, 2:data.UpdateID sessID, 3:data.CMBatch mutations, 4:list<string> symbols)
+     throws (1:NoSuchScanIDException nssi);
+
+  void invalidateConditionalUpdate(1:trace.TInfo tinfo, 2:data.UpdateID sessID);
 
   // on success, returns an empty list
   list<data.TKeyExtent> bulkImport(3:trace.TInfo tinfo, 1:security.TCredentials credentials, 4:i64 tid, 2:data.TabletFiles files, 5:bool setTime) throws (1:client.ThriftSecurityException sec),

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ec537137/server/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java b/server/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
index 465eb8e..94a8cd6 100644
--- a/server/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
+++ b/server/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
@@ -313,23 +313,10 @@ public class SecurityOperation {
     return hasTablePermission(credentials.getPrincipal(), table, TablePermission.WRITE, true);
   }
   
-  public boolean canConditionallyUpdate(TCredentials credentials, Map<TKeyExtent,List<TConditionalMutation>> mutations, List<String> symbols,
-      List<ByteBuffer> authorizations) throws ThriftSecurityException {
-    Set<TKeyExtent> ks = mutations.keySet();
-    
-    byte[] table = null;
-    
-    for (TKeyExtent tke : ks) {
-      if (table == null)
-        table = tke.getTable();
-      else if (!Arrays.equals(table, tke.getTable()))
-        return false;
-    }
+  public boolean canConditionallyUpdate(TCredentials credentials, String tableID, List<ByteBuffer> authorizations) throws ThriftSecurityException {
     
     authenticate(credentials);
     
-    String tableID = new String(table);
-    
     return hasTablePermission(credentials.getPrincipal(), tableID, TablePermission.WRITE, true)
         && hasTablePermission(credentials.getPrincipal(), tableID, TablePermission.READ, true);
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ec537137/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
index 4b905ce..ee1d1b6 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
@@ -397,9 +397,30 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
       
     }
     
+    synchronized Session reserveSession(long sessionId, boolean wait) {
+      Session session = sessions.get(sessionId);
+      if (session != null) {
+        while(wait && session.reserved){
+          try {
+            wait(1000);
+          } catch (InterruptedException e) {
+            throw new RuntimeException();
+          }
+        }
+        
+        if (session.reserved)
+          throw new IllegalStateException();
+        session.reserved = true;
+      }
+      
+      return session;
+      
+    }
+    
     synchronized void unreserveSession(Session session) {
       if (!session.reserved)
         throw new IllegalStateException();
+      notifyAll();
       session.reserved = false;
       session.lastAccessTime = System.currentTimeMillis();
     }
@@ -409,7 +430,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
       if (session != null)
         unreserveSession(session);
     }
-    
+        
     synchronized Session getSession(long sessionId) {
       Session session = sessions.get(sessionId);
       if (session != null)
@@ -418,9 +439,15 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
     }
     
     Session removeSession(long sessionId) {
+      return removeSession(sessionId, false);
+    }
+    
+    Session removeSession(long sessionId, boolean unreserve) {
       Session session = null;
       synchronized (this) {
         session = sessions.remove(sessionId);
+        if(unreserve && session != null)
+          unreserveSession(session);
       }
       
       // do clean up out side of lock..
@@ -719,6 +746,12 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
     
   }
   
+  private static class ConditionalSession extends Session {
+    public TCredentials credentials;
+    public Authorizations auths;
+    public String tableId;
+  }
+  
   private static class UpdateSession extends Session {
     public Tablet currentTablet;
     public MapCounter<Tablet> successfulCommits = new MapCounter<Tablet>();
@@ -1856,7 +1889,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
 
     }
 
-    private Map<KeyExtent,List<ServerConditionalMutation>> conditionalUpdate(TCredentials credentials, List<ByteBuffer> authorizations,
+    private Map<KeyExtent,List<ServerConditionalMutation>> conditionalUpdate(TCredentials credentials, Authorizations authorizations,
         Map<KeyExtent,List<ServerConditionalMutation>> updates, ArrayList<TCMResult> results, List<String> symbols) {
       // sort each list of mutations, this is done to avoid deadlock and doing seeks in order is more efficient and detect duplicate rows.
       ConditionalMutationSet.sortConditionalMutations(updates);
@@ -1869,7 +1902,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
       // get as many locks as possible w/o blocking... defer any rows that are locked
       List<RowLock> locks = rowLocks.acquireRowlocks(updates, deferred);
       try {
-        checkConditions(updates, results, new Authorizations(authorizations), symbols);
+        checkConditions(updates, results, authorizations, symbols);
         writeConditionalMutations(updates, results, credentials);
       } finally {
         rowLocks.releaseRowLocks(locks);
@@ -1878,11 +1911,10 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
     }
     
     @Override
-    public List<TCMResult> conditionalUpdate(TInfo tinfo, TCredentials credentials, List<ByteBuffer> authorizations,
-        Map<TKeyExtent,List<TConditionalMutation>> mutations, List<String> symbols) throws ThriftSecurityException {
+    public long startConditionalUpdate(TInfo tinfo, TCredentials credentials, List<ByteBuffer> authorizations, String tableID) throws ThriftSecurityException, TException {
       
       Authorizations userauths = null;
-      if (!security.canConditionallyUpdate(credentials, mutations, symbols, authorizations))
+      if (!security.canConditionallyUpdate(credentials, tableID, authorizations))
         throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
       
       userauths = security.getUserAuthorizations(credentials);
@@ -1890,23 +1922,58 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
         if (!userauths.contains(ByteBufferUtil.toBytes(auth)))
           throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.BAD_AUTHORIZATIONS);
 
+      ConditionalSession cs = new ConditionalSession();
+      cs.auths = new Authorizations(authorizations);
+      cs.credentials = credentials;
+      cs.tableId = tableID;
+      
+      return sessionManager.createSession(cs, false);
+    }
+
+    @Override
+    public List<TCMResult> conditionalUpdate(TInfo tinfo, long sessID, Map<TKeyExtent,List<TConditionalMutation>> mutations, List<String> symbols)
+        throws NoSuchScanIDException, TException {
       // TODO sessions, should show up in list scans
       // TODO timeout like scans do
       
-      Map<KeyExtent,List<ServerConditionalMutation>> updates = Translator.translate(mutations, Translator.TKET,
-          new Translator.ListTranslator<TConditionalMutation,ServerConditionalMutation>(ServerConditionalMutation.TCMT));
+      ConditionalSession cs = (ConditionalSession) sessionManager.reserveSession(sessID);
       
-      ArrayList<TCMResult> results = new ArrayList<TCMResult>();
+      if(cs == null)
+        throw new NoSuchScanIDException();
       
-      Map<KeyExtent,List<ServerConditionalMutation>> deferred = conditionalUpdate(credentials, authorizations, updates, results, symbols);
-
-      while (deferred.size() > 0) {
-        deferred = conditionalUpdate(credentials, authorizations, deferred, results, symbols);
+      
+      
+      try{
+        Map<KeyExtent,List<ServerConditionalMutation>> updates = Translator.translate(mutations, Translator.TKET,
+            new Translator.ListTranslator<TConditionalMutation,ServerConditionalMutation>(ServerConditionalMutation.TCMT));
+        
+        Text tid = new Text(cs.tableId);
+        for(KeyExtent ke : updates.keySet())
+          if(!ke.getTableId().equals(tid))
+            throw new IllegalArgumentException("Unexpected table id "+tid+" != "+ke.getTableId());
+        
+        ArrayList<TCMResult> results = new ArrayList<TCMResult>();
+        
+        Map<KeyExtent,List<ServerConditionalMutation>> deferred = conditionalUpdate(cs.credentials, cs.auths, updates, results, symbols);
+  
+        while (deferred.size() > 0) {
+          deferred = conditionalUpdate(cs.credentials, cs.auths, deferred, results, symbols);
+        }
+  
+        return results;
+      }finally{
+        sessionManager.removeSession(sessID, true);
       }
-
-      return results;
     }
 
+    @Override
+    public void invalidateConditionalUpdate(TInfo tinfo, long sessID) throws TException {
+      //this method should wait for any running conditional update to complete
+      //after this method returns a conditional update should not be able to start
+      ConditionalSession cs = (ConditionalSession) sessionManager.reserveSession(sessID, true);
+      if(cs != null)
+        sessionManager.removeSession(sessID, true);
+    }
 
     @Override
     public void splitTablet(TInfo tinfo, TCredentials credentials, TKeyExtent tkeyExtent, ByteBuffer splitPoint) throws NotServingTabletException,

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ec537137/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
index f8b32de..67e7249 100644
--- a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
+++ b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
@@ -52,6 +52,7 @@ import org.apache.accumulo.core.master.thrift.TabletServerStatus;
 import org.apache.accumulo.core.security.thrift.TCredentials;
 import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction;
 import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
+import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Processor;
@@ -204,10 +205,21 @@ public class NullTserver {
     }
 
     @Override
-    public List<TCMResult> conditionalUpdate(TInfo tinfo, TCredentials credentials, List<ByteBuffer> authorizations,
-        Map<TKeyExtent,List<TConditionalMutation>> mutations, List<String> symbols) throws TException {
+    public long startConditionalUpdate(TInfo tinfo, TCredentials credentials, List<ByteBuffer> authorizations, String tableID) throws ThriftSecurityException,
+        TException {
+      return 0;
+    }
+
+    @Override
+    public List<TCMResult> conditionalUpdate(TInfo tinfo, long sessID, Map<TKeyExtent,List<TConditionalMutation>> mutations, List<String> symbols)
+        throws NoSuchScanIDException, TException {
       return null;
     }
+
+    @Override
+    public void invalidateConditionalUpdate(TInfo tinfo, long sessID) throws TException {
+      
+    }
   }
   
   static class Opts extends Help {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ec537137/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java b/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java
index 33dc458..65a5636 100644
--- a/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java
+++ b/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java
@@ -1127,6 +1127,7 @@ public class ConditionalWriterTest {
     TabletLocator locator = TabletLocator.getLocator(zki, new Text(Tables.getNameToIdMap(zki).get(table)));
     while (locator.locateTablet(new Text("a"), false, false, CredentialHelper.create("root", new PasswordToken(secret), zki.getInstanceID())) != null) {
       UtilWaitThread.sleep(50);
+      locator.invalidateCache();
     }
   }