You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2013/07/23 18:54:51 UTC
[26/50] git commit: added ability to invalidate server side
conditional update sessions
added ability to invalidate server side conditional update sessions
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/ec537137
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/ec537137
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/ec537137
Branch: refs/heads/ACCUMULO-1000
Commit: ec537137aa958cf87d8d11ff5fdfe05c78dea624
Parents: a169064
Author: keith@deenlo.com <ke...@deenlo.com>
Authored: Fri Jul 19 16:00:13 2013 -0400
Committer: keith@deenlo.com <ke...@deenlo.com>
Committed: Fri Jul 19 16:00:13 2013 -0400
----------------------------------------------------------------------
.../core/client/impl/ConditionalWriterImpl.java | 120 +-
.../thrift/TabletClientService.java | 2831 +++++++++++++++---
core/src/main/thrift/tabletserver.thrift | 9 +-
.../server/security/SecurityOperation.java | 15 +-
.../server/tabletserver/TabletServer.java | 97 +-
.../test/performance/thrift/NullTserver.java | 16 +-
.../accumulo/test/ConditionalWriterTest.java | 1 +
7 files changed, 2571 insertions(+), 518 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ec537137/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
index f0d6108..31403fb 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
@@ -34,10 +34,12 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.ConditionalWriter;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.TableDeletedException;
+import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.TableOfflineException;
import org.apache.accumulo.core.client.impl.TabletLocator.TabletServerMutations;
import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
@@ -57,6 +59,7 @@ import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.accumulo.core.security.VisibilityEvaluator;
import org.apache.accumulo.core.security.VisibilityParseException;
import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
import org.apache.accumulo.core.util.BadArgumentException;
import org.apache.accumulo.core.util.ByteBufferUtil;
@@ -177,7 +180,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
return queue;
}
- private void queueFailed(List<QCMutation> mutations) {
+ private void queueRetry(List<QCMutation> mutations) {
for (QCMutation qcm : mutations) {
qcm.resetDelay();
}
@@ -208,7 +211,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
}
if (failures.size() > 0)
- queueFailed(failures);
+ queueRetry(failures);
for (Entry<String,TabletServerMutations<QCMutation>> entry : binnedMutations.entrySet()) {
queue(entry.getKey(), entry.getValue());
@@ -350,6 +353,8 @@ class ConditionalWriterImpl implements ConditionalWriter {
Map<Long,CMK> cmidToCm = new HashMap<Long,CMK>();
MutableLong cmid = new MutableLong(0);
+ Long sessionId = null;
+
try {
client = ThriftUtil.getTServerClient(location, instance.getConfiguration());
@@ -357,9 +362,11 @@ class ConditionalWriterImpl implements ConditionalWriter {
CompressedIterators compressedIters = new CompressedIterators();
convertMutations(mutations, cmidToCm, cmid, tmutations, compressedIters);
-
- List<TCMResult> tresults = client.conditionalUpdate(tinfo, credentials, ByteBufferUtil.toByteBuffers(auths.getAuthorizations()), tmutations,
- compressedIters.getSymbolTable());
+
+ //TODO create a session per tserver and keep reusing it
+ sessionId = client.startConditionalUpdate(tinfo, credentials, ByteBufferUtil.toByteBuffers(auths.getAuthorizations()), tableId);
+
+ List<TCMResult> tresults = client.conditionalUpdate(tinfo, sessionId, tmutations, compressedIters.getSymbolTable());
HashSet<KeyExtent> extentsToInvalidate = new HashSet<KeyExtent>();
@@ -383,27 +390,108 @@ class ConditionalWriterImpl implements ConditionalWriter {
locator.invalidateCache(ke);
}
- queueFailed(ignored);
+ queueRetry(ignored);
+ } catch (NoSuchScanIDException nssie){
+ queueRetry(cmidToCm);
} catch (ThriftSecurityException tse) {
AccumuloSecurityException ase = new AccumuloSecurityException(credentials.getPrincipal(), tse.getCode(), Tables.getPrintableTableInfoFromId(instance,
tableId), tse);
- for (CMK cmk : cmidToCm.values())
- cmk.cm.resultQueue.add(new Result(ase, cmk.cm, location));
+ queueException(location, cmidToCm, ase);
} catch (TTransportException e) {
locator.invalidateCache(location);
- for (CMK cmk : cmidToCm.values())
- cmk.cm.resultQueue.add(new Result(Status.UNKNOWN, cmk.cm, location));
+ invalidateSession(location, mutations, cmidToCm, sessionId);
} catch (TApplicationException tae) {
- for (CMK cmk : cmidToCm.values())
- cmk.cm.resultQueue.add(new Result(new AccumuloServerException(location, tae), cmk.cm, location));
+ queueException(location, cmidToCm, new AccumuloServerException(location, tae));
} catch (TException e) {
locator.invalidateCache(location);
- for (CMK cmk : cmidToCm.values())
- cmk.cm.resultQueue.add(new Result(Status.UNKNOWN, cmk.cm, location));
+ invalidateSession(location, mutations, cmidToCm, sessionId);
} catch (Exception e) {
- for (CMK cmk : cmidToCm.values())
- cmk.cm.resultQueue.add(new Result(e, cmk.cm, location));
+ queueException(location, cmidToCm, e);
+ } finally {
+ ThriftUtil.returnClient((TServiceClient) client);
+ }
+ }
+
+ private void queueRetry(Map<Long,CMK> cmidToCm) {
+ ArrayList<QCMutation> ignored = new ArrayList<QCMutation>();
+ for (CMK cmk : cmidToCm.values())
+ ignored.add(cmk.cm);
+ queueRetry(ignored);
+ }
+
+ private void queueException(String location, Map<Long,CMK> cmidToCm, Exception e) {
+ for (CMK cmk : cmidToCm.values())
+ cmk.cm.resultQueue.add(new Result(e, cmk.cm, location));
+ }
+
+ private void invalidateSession(String location, TabletServerMutations<QCMutation> mutations, Map<Long,CMK> cmidToCm, Long sessionId) {
+ if(sessionId == null){
+ queueRetry(cmidToCm);
+ }else{
+ try {
+ invalidateSession(sessionId, location, mutations);
+ for (CMK cmk : cmidToCm.values())
+ cmk.cm.resultQueue.add(new Result(Status.UNKNOWN, cmk.cm, location));
+ }catch(Exception e2){
+ queueException(location, cmidToCm, e2);
+ }
+ }
+ }
+
+ /*
+ * The purpose of this code is to ensure that a conditional mutation will not execute when its status is unknown. This allows a user to read the row when the
+ * status is unknown and not have to worry about the tserver applying the mutation after the scan.
+ *
+ * If a conditional mutation is taking a long time to process, then this method will wait for it to finish... unless this exceeds timeout.
+ */
+ private void invalidateSession(long sessionId, String location, TabletServerMutations<QCMutation> mutations) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+
+ // TODO could assume tserver will invalidate sessions after a given time period
+
+ ArrayList<QCMutation> mutList = new ArrayList<QCMutation>();
+
+ for (List<QCMutation> tml : mutations.getMutations().values()) {
+ mutList.addAll(tml);
+ }
+
+ while (true) {
+ Map<String,TabletServerMutations<QCMutation>> binnedMutations = new HashMap<String,TabletLocator.TabletServerMutations<QCMutation>>();
+ List<QCMutation> failures = new ArrayList<QCMutation>();
+
+ locator.binMutations(mutList, binnedMutations, failures, credentials);
+
+ // TODO do failures matter? not if failures only indicates tablets are not assigned
+
+ if (!binnedMutations.containsKey(location)) {
+ // the tablets are at different locations now, so there is no need to invalidate the session
+ // TODO could be a case where tablet comes back to tserver and then UNKNOW condMut goes through
+ return;
+ }
+
+ try {
+ // if the mutation is currently processing, this method will block until its done or times out
+ invalidateSession(sessionId, location);
+ return;
+ } catch (TApplicationException tae) {
+ throw new AccumuloServerException(location, tae);
+ } catch (TException e) {
+ locator.invalidateCache(location);
+ }
+
+ //TODO sleep
+ }
+
+ }
+
+ private void invalidateSession(long sessionId, String location) throws TException {
+ TabletClientService.Iface client = null;
+
+ TInfo tinfo = Tracer.traceInfo();
+
+ try {
+ client = ThriftUtil.getTServerClient(location, instance.getConfiguration());
+ client.invalidateConditionalUpdate(tinfo, sessionId);
} finally {
ThriftUtil.returnClient((TServiceClient) client);
}