You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2013/07/23 18:54:49 UTC
[24/50] added ability to invalidate server side conditional update
sessions
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ec537137/core/src/main/thrift/tabletserver.thrift
----------------------------------------------------------------------
diff --git a/core/src/main/thrift/tabletserver.thrift b/core/src/main/thrift/tabletserver.thrift
index 5dfcb4b..c6d442c 100644
--- a/core/src/main/thrift/tabletserver.thrift
+++ b/core/src/main/thrift/tabletserver.thrift
@@ -166,9 +166,14 @@ service TabletClientService extends client.ClientService {
throws (1:client.ThriftSecurityException sec,
2:NotServingTabletException nste,
3:ConstraintViolationException cve),
-
- list<data.TCMResult> conditionalUpdate(1:trace.TInfo tinfo, 2:security.TCredentials credentials, 3:list<binary> authorizations, 4:data.CMBatch mutations, 5:list<string> symbols)
+
+ data.UpdateID startConditionalUpdate(1:trace.TInfo tinfo, 2:security.TCredentials credentials, 3:list<binary> authorizations, 4:string tableID)
throws (1:client.ThriftSecurityException sec);
+
+ list<data.TCMResult> conditionalUpdate(1:trace.TInfo tinfo, 2:data.UpdateID sessID, 3:data.CMBatch mutations, 4:list<string> symbols)
+ throws (1:NoSuchScanIDException nssi);
+
+ void invalidateConditionalUpdate(1:trace.TInfo tinfo, 2:data.UpdateID sessID);
// on success, returns an empty list
list<data.TKeyExtent> bulkImport(3:trace.TInfo tinfo, 1:security.TCredentials credentials, 4:i64 tid, 2:data.TabletFiles files, 5:bool setTime) throws (1:client.ThriftSecurityException sec),
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ec537137/server/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java b/server/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
index 465eb8e..94a8cd6 100644
--- a/server/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
+++ b/server/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
@@ -313,23 +313,10 @@ public class SecurityOperation {
return hasTablePermission(credentials.getPrincipal(), table, TablePermission.WRITE, true);
}
- public boolean canConditionallyUpdate(TCredentials credentials, Map<TKeyExtent,List<TConditionalMutation>> mutations, List<String> symbols,
- List<ByteBuffer> authorizations) throws ThriftSecurityException {
- Set<TKeyExtent> ks = mutations.keySet();
-
- byte[] table = null;
-
- for (TKeyExtent tke : ks) {
- if (table == null)
- table = tke.getTable();
- else if (!Arrays.equals(table, tke.getTable()))
- return false;
- }
+ public boolean canConditionallyUpdate(TCredentials credentials, String tableID, List<ByteBuffer> authorizations) throws ThriftSecurityException {
authenticate(credentials);
- String tableID = new String(table);
-
return hasTablePermission(credentials.getPrincipal(), tableID, TablePermission.WRITE, true)
&& hasTablePermission(credentials.getPrincipal(), tableID, TablePermission.READ, true);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ec537137/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
index 4b905ce..ee1d1b6 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
@@ -397,9 +397,30 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
+ synchronized Session reserveSession(long sessionId, boolean wait) {
+ Session session = sessions.get(sessionId);
+ if (session != null) {
+ while(wait && session.reserved){
+ try {
+ wait(1000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException();
+ }
+ }
+
+ if (session.reserved)
+ throw new IllegalStateException();
+ session.reserved = true;
+ }
+
+ return session;
+
+ }
+
synchronized void unreserveSession(Session session) {
if (!session.reserved)
throw new IllegalStateException();
+ notifyAll();
session.reserved = false;
session.lastAccessTime = System.currentTimeMillis();
}
@@ -409,7 +430,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
if (session != null)
unreserveSession(session);
}
-
+
synchronized Session getSession(long sessionId) {
Session session = sessions.get(sessionId);
if (session != null)
@@ -418,9 +439,15 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
Session removeSession(long sessionId) {
+ return removeSession(sessionId, false);
+ }
+
+ Session removeSession(long sessionId, boolean unreserve) {
Session session = null;
synchronized (this) {
session = sessions.remove(sessionId);
+ if(unreserve && session != null)
+ unreserveSession(session);
}
// do clean up out side of lock..
@@ -719,6 +746,12 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
+ private static class ConditionalSession extends Session {
+ public TCredentials credentials;
+ public Authorizations auths;
+ public String tableId;
+ }
+
private static class UpdateSession extends Session {
public Tablet currentTablet;
public MapCounter<Tablet> successfulCommits = new MapCounter<Tablet>();
@@ -1856,7 +1889,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
- private Map<KeyExtent,List<ServerConditionalMutation>> conditionalUpdate(TCredentials credentials, List<ByteBuffer> authorizations,
+ private Map<KeyExtent,List<ServerConditionalMutation>> conditionalUpdate(TCredentials credentials, Authorizations authorizations,
Map<KeyExtent,List<ServerConditionalMutation>> updates, ArrayList<TCMResult> results, List<String> symbols) {
// sort each list of mutations, this is done to avoid deadlock and doing seeks in order is more efficient and detect duplicate rows.
ConditionalMutationSet.sortConditionalMutations(updates);
@@ -1869,7 +1902,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
// get as many locks as possible w/o blocking... defer any rows that are locked
List<RowLock> locks = rowLocks.acquireRowlocks(updates, deferred);
try {
- checkConditions(updates, results, new Authorizations(authorizations), symbols);
+ checkConditions(updates, results, authorizations, symbols);
writeConditionalMutations(updates, results, credentials);
} finally {
rowLocks.releaseRowLocks(locks);
@@ -1878,11 +1911,10 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
@Override
- public List<TCMResult> conditionalUpdate(TInfo tinfo, TCredentials credentials, List<ByteBuffer> authorizations,
- Map<TKeyExtent,List<TConditionalMutation>> mutations, List<String> symbols) throws ThriftSecurityException {
+ public long startConditionalUpdate(TInfo tinfo, TCredentials credentials, List<ByteBuffer> authorizations, String tableID) throws ThriftSecurityException, TException {
Authorizations userauths = null;
- if (!security.canConditionallyUpdate(credentials, mutations, symbols, authorizations))
+ if (!security.canConditionallyUpdate(credentials, tableID, authorizations))
throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
userauths = security.getUserAuthorizations(credentials);
@@ -1890,23 +1922,58 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
if (!userauths.contains(ByteBufferUtil.toBytes(auth)))
throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.BAD_AUTHORIZATIONS);
+ ConditionalSession cs = new ConditionalSession();
+ cs.auths = new Authorizations(authorizations);
+ cs.credentials = credentials;
+ cs.tableId = tableID;
+
+ return sessionManager.createSession(cs, false);
+ }
+
+ @Override
+ public List<TCMResult> conditionalUpdate(TInfo tinfo, long sessID, Map<TKeyExtent,List<TConditionalMutation>> mutations, List<String> symbols)
+ throws NoSuchScanIDException, TException {
// TODO sessions, should show up in list scans
// TODO timeout like scans do
- Map<KeyExtent,List<ServerConditionalMutation>> updates = Translator.translate(mutations, Translator.TKET,
- new Translator.ListTranslator<TConditionalMutation,ServerConditionalMutation>(ServerConditionalMutation.TCMT));
+ ConditionalSession cs = (ConditionalSession) sessionManager.reserveSession(sessID);
- ArrayList<TCMResult> results = new ArrayList<TCMResult>();
+ if(cs == null)
+ throw new NoSuchScanIDException();
- Map<KeyExtent,List<ServerConditionalMutation>> deferred = conditionalUpdate(credentials, authorizations, updates, results, symbols);
-
- while (deferred.size() > 0) {
- deferred = conditionalUpdate(credentials, authorizations, deferred, results, symbols);
+
+
+ try{
+ Map<KeyExtent,List<ServerConditionalMutation>> updates = Translator.translate(mutations, Translator.TKET,
+ new Translator.ListTranslator<TConditionalMutation,ServerConditionalMutation>(ServerConditionalMutation.TCMT));
+
+ Text tid = new Text(cs.tableId);
+ for(KeyExtent ke : updates.keySet())
+ if(!ke.getTableId().equals(tid))
+ throw new IllegalArgumentException("Unexpected table id "+tid+" != "+ke.getTableId());
+
+ ArrayList<TCMResult> results = new ArrayList<TCMResult>();
+
+ Map<KeyExtent,List<ServerConditionalMutation>> deferred = conditionalUpdate(cs.credentials, cs.auths, updates, results, symbols);
+
+ while (deferred.size() > 0) {
+ deferred = conditionalUpdate(cs.credentials, cs.auths, deferred, results, symbols);
+ }
+
+ return results;
+ }finally{
+ sessionManager.removeSession(sessID, true);
}
-
- return results;
}
+ @Override
+ public void invalidateConditionalUpdate(TInfo tinfo, long sessID) throws TException {
+ //this method should wait for any running conditional update to complete
+ //after this method returns a conditional update should not be able to start
+ ConditionalSession cs = (ConditionalSession) sessionManager.reserveSession(sessID, true);
+ if(cs != null)
+ sessionManager.removeSession(sessID, true);
+ }
@Override
public void splitTablet(TInfo tinfo, TCredentials credentials, TKeyExtent tkeyExtent, ByteBuffer splitPoint) throws NotServingTabletException,
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ec537137/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
index f8b32de..67e7249 100644
--- a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
+++ b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
@@ -52,6 +52,7 @@ import org.apache.accumulo.core.master.thrift.TabletServerStatus;
import org.apache.accumulo.core.security.thrift.TCredentials;
import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction;
import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
+import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Processor;
@@ -204,10 +205,21 @@ public class NullTserver {
}
@Override
- public List<TCMResult> conditionalUpdate(TInfo tinfo, TCredentials credentials, List<ByteBuffer> authorizations,
- Map<TKeyExtent,List<TConditionalMutation>> mutations, List<String> symbols) throws TException {
+ public long startConditionalUpdate(TInfo tinfo, TCredentials credentials, List<ByteBuffer> authorizations, String tableID) throws ThriftSecurityException,
+ TException {
+ return 0;
+ }
+
+ @Override
+ public List<TCMResult> conditionalUpdate(TInfo tinfo, long sessID, Map<TKeyExtent,List<TConditionalMutation>> mutations, List<String> symbols)
+ throws NoSuchScanIDException, TException {
return null;
}
+
+ @Override
+ public void invalidateConditionalUpdate(TInfo tinfo, long sessID) throws TException {
+
+ }
}
static class Opts extends Help {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ec537137/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java b/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java
index 33dc458..65a5636 100644
--- a/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java
+++ b/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java
@@ -1127,6 +1127,7 @@ public class ConditionalWriterTest {
TabletLocator locator = TabletLocator.getLocator(zki, new Text(Tables.getNameToIdMap(zki).get(table)));
while (locator.locateTablet(new Text("a"), false, false, CredentialHelper.create("root", new PasswordToken(secret), zki.getInstanceID())) != null) {
UtilWaitThread.sleep(50);
+ locator.invalidateCache();
}
}