You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2013/07/23 18:55:14 UTC
[49/50] git commit: ACCUMULO-1000 Added timeout & config to
conditional writer. Added unit test
ACCUMULO-1000 Added timeout & config to conditional writer. Added unit test
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/79019ef0
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/79019ef0
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/79019ef0
Branch: refs/heads/ACCUMULO-1000
Commit: 79019ef0477b76966e2aff7259443aa9cd2f1cce
Parents: 5183ae4
Author: Keith Turner <kt...@apache.org>
Authored: Tue Jul 23 12:07:41 2013 -0400
Committer: Keith Turner <kt...@apache.org>
Committed: Tue Jul 23 12:11:06 2013 -0400
----------------------------------------------------------------------
.../accumulo/core/client/ConditionalWriter.java | 35 ++----
.../core/client/ConditionalWriterConfig.java | 118 +++++++++++++++++++
.../apache/accumulo/core/client/Connector.java | 9 +-
.../core/client/impl/ConditionalWriterImpl.java | 96 ++++++++++-----
.../core/client/impl/ConnectorImpl.java | 7 +-
.../core/client/mock/MockConnector.java | 3 +-
.../server/tabletserver/TabletServer.java | 2 +-
.../accumulo/test/FaultyConditionalWriter.java | 9 --
.../accumulo/test/functional/SlowIterator.java | 24 +++-
.../accumulo/test/ConditionalWriterTest.java | 115 ++++++++++++++----
10 files changed, 313 insertions(+), 105 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/79019ef0/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java b/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java
index b434463..db29492 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java
@@ -18,8 +18,8 @@
package org.apache.accumulo.core.client;
import java.util.Iterator;
-import java.util.concurrent.TimeUnit;
+import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode;
import org.apache.accumulo.core.data.ConditionalMutation;
/**
@@ -48,11 +48,11 @@ public interface ConditionalWriter {
public Status getStatus() throws AccumuloException, AccumuloSecurityException {
if (status == null) {
if (exception instanceof AccumuloException)
- throw (AccumuloException) exception;
- if (exception instanceof AccumuloSecurityException)
- throw (AccumuloSecurityException) exception;
- if (exception instanceof RuntimeException)
- throw (RuntimeException) exception;
+ throw new AccumuloException(exception);
+ if (exception instanceof AccumuloSecurityException) {
+ AccumuloSecurityException ase = (AccumuloSecurityException) exception;
+ throw new AccumuloSecurityException(ase.getUser(), SecurityErrorCode.valueOf(ase.getSecurityErrorCode().name()), ase.getTableInfo(), ase);
+ }
else
throw new AccumuloException(exception);
}
@@ -94,33 +94,12 @@ public interface ConditionalWriter {
* A condition contained a column visibility that could never be seen
*/
INVISIBLE_VISIBILITY,
- /**
- * nothing was done with this mutation, this is caused by previous mutations failing in some way like timing out
- */
- IGNORED
+
}
public abstract Iterator<Result> write(Iterator<ConditionalMutation> mutations);
public abstract Result write(ConditionalMutation mutation);
-
- /**
- * This setting determines how long a scanner will automatically retry when a failure occurs. By default a scanner will retry forever.
- *
- * Setting to zero or Long.MAX_VALUE and TimeUnit.MILLISECONDS means to retry forever.
- *
- * @param timeOut
- * @param timeUnit
- * determines how timeout is interpreted
- */
- public void setTimeout(long timeOut, TimeUnit timeUnit);
-
- /**
- * Returns the setting for how long a scanner will automatically retry when a failure occurs.
- *
- * @return the timeout configured for this scanner
- */
- public long getTimeout(TimeUnit timeUnit);
public void close();
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/79019ef0/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriterConfig.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriterConfig.java b/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriterConfig.java
new file mode 100644
index 0000000..f2a91ea
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriterConfig.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.ArgumentChecker;
+
+/**
+ *
+ * @since 1.6.0
+ */
+public class ConditionalWriterConfig {
+
+ private static final Long DEFAULT_TIMEOUT = Long.MAX_VALUE;
+ private Long timeout = null;
+
+ private static final Integer DEFAULT_MAX_WRITE_THREADS = 3;
+ private Integer maxWriteThreads = null;
+
+ private Authorizations auths = Authorizations.EMPTY;
+
+ /**
+ * A set of authorization labels that will be checked against the column visibility of each key in order to filter data. The authorizations passed in must be
+ * a subset of the accumulo user's set of authorizations. If the accumulo user has authorizations (A1, A2) and authorizations (A2, A3) are passed, then an
+ * exception will be thrown.
+ *
+ * <p>
+ * Any condition that is not visible with this set of authorizations will fail.
+ *
+ * @param auths
+ */
+ public ConditionalWriterConfig setAuthorizations(Authorizations auths) {
+ ArgumentChecker.notNull(auths);
+ this.auths = auths;
+ return this;
+ }
+
+ /**
+ * Sets the maximum amount of time an unresponsive server will be re-tried. When this timeout is exceeded, the {@link ConditionalWriter} should return the
+ * mutation with an exception.<br />
+ * For no timeout, set to zero, or {@link Long#MAX_VALUE} with {@link TimeUnit#MILLISECONDS}.
+ *
+ * <p>
+ * {@link TimeUnit#MICROSECONDS} or {@link TimeUnit#NANOSECONDS} will be truncated to the nearest {@link TimeUnit#MILLISECONDS}.<br />
+ * If this truncation would result in making the value zero when it was specified as non-zero, then a minimum value of one {@link TimeUnit#MILLISECONDS} will
+ * be used.
+ *
+ * <p>
+ * <b>Default:</b> {@link Long#MAX_VALUE} (no timeout)
+ *
+ * @param timeout
+ * the timeout, in the unit specified by the value of {@code timeUnit}
+ * @param timeUnit
+ * determines how {@code timeout} will be interpreted
+ * @throws IllegalArgumentException
+ * if {@code timeout} is less than 0
+ * @return {@code this} to allow chaining of set methods
+ */
+ public ConditionalWriterConfig setTimeout(long timeout, TimeUnit timeUnit) {
+ if (timeout < 0)
+ throw new IllegalArgumentException("Negative timeout not allowed " + timeout);
+
+ if (timeout == 0)
+ this.timeout = Long.MAX_VALUE;
+ else
+ // make small, positive values that truncate to 0 when converted use the minimum millis instead
+ this.timeout = Math.max(1, timeUnit.toMillis(timeout));
+ return this;
+ }
+
+ /**
+ * Sets the maximum number of threads to use for writing data to the tablet servers.
+ *
+ * <p>
+ * <b>Default:</b> 3
+ *
+ * @param maxWriteThreads
+ * the maximum threads to use
+ * @throws IllegalArgumentException
+ * if {@code maxWriteThreads} is non-positive
+ * @return {@code this} to allow chaining of set methods
+ */
+ public ConditionalWriterConfig setMaxWriteThreads(int maxWriteThreads) {
+ if (maxWriteThreads <= 0)
+ throw new IllegalArgumentException("Max threads must be positive " + maxWriteThreads);
+
+ this.maxWriteThreads = maxWriteThreads;
+ return this;
+ }
+
+ public Authorizations getAuthorizations() {
+ return auths;
+ }
+
+ public long getTimeout(TimeUnit timeUnit) {
+ return timeUnit.convert(timeout != null ? timeout : DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);
+ }
+
+ public int getMaxWriteThreads() {
+ return maxWriteThreads != null ? maxWriteThreads : DEFAULT_MAX_WRITE_THREADS;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/79019ef0/core/src/main/java/org/apache/accumulo/core/client/Connector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/Connector.java b/core/src/main/java/org/apache/accumulo/core/client/Connector.java
index 45a8162..bbfa55f 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/Connector.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/Connector.java
@@ -178,16 +178,15 @@ public abstract class Connector {
*
* @param tableName
* the name of the table to query data from
- * @param authorizations
- * A set of authorization labels that will be checked against the column visibility of each key in order to filter data. The authorizations passed in
- * must be a subset of the accumulo user's set of authorizations. If the accumulo user has authorizations (A1, A2) and authorizations (A2, A3) are
- * passed, then an exception will be thrown.
+ * @param config
+ * configuration used to create conditional writer
*
* @return ConditionalWriter object for writing ConditionalMutations
* @throws TableNotFoundException
* when the specified table doesn't exist
+ * @since 1.6.0
*/
- public abstract ConditionalWriter createConditionalWriter(String tableName, Authorizations authorizations) throws TableNotFoundException;
+ public abstract ConditionalWriter createConditionalWriter(String tableName, ConditionalWriterConfig config) throws TableNotFoundException;
/**
* Accessor method for internal instance object.
http://git-wip-us.apache.org/repos/asf/accumulo/blob/79019ef0/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
index ed20054..55aa718 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
@@ -37,12 +37,15 @@ import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.ConditionalWriter;
+import org.apache.accumulo.core.client.ConditionalWriterConfig;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.TableDeletedException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.TableOfflineException;
+import org.apache.accumulo.core.client.TimedOutException;
import org.apache.accumulo.core.client.impl.TabletLocator.TabletServerMutations;
import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Condition;
import org.apache.accumulo.core.data.ConditionalMutation;
@@ -94,6 +97,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
private TCredentials credentials;
private TabletLocator locator;
private String tableId;
+ private long timeout;
private static class ServerQueue {
BlockingQueue<TabletServerMutations<QCMutation>> queue = new LinkedBlockingQueue<TabletServerMutations<QCMutation>>();
@@ -125,7 +129,6 @@ class ConditionalWriterImpl implements ConditionalWriter {
throw new NoSuchElementException();
try {
- // TODO maybe call drainTo after take() to get a batch efficiently
Result result = rq.poll(1, TimeUnit.SECONDS);
while (result == null) {
@@ -153,12 +156,14 @@ class ConditionalWriterImpl implements ConditionalWriter {
private BlockingQueue<Result> resultQueue;
private long resetTime;
private long delay = 50;
+ private long entryTime;
- QCMutation(ConditionalMutation cm, BlockingQueue<Result> resultQueue) {
+ QCMutation(ConditionalMutation cm, BlockingQueue<Result> resultQueue, long entryTime) {
super(cm);
this.resultQueue = resultQueue;
+ this.entryTime = entryTime;
}
-
+
@Override
public int compareTo(Delayed o) {
QCMutation oqcm = (QCMutation) o;
@@ -171,7 +176,6 @@ class ConditionalWriterImpl implements ConditionalWriter {
}
void resetDelay() {
- // TODO eventually timeout a mutation
delay = Math.min(delay * 2, MAX_SLEEP);
resetTime = System.currentTimeMillis();
}
@@ -190,12 +194,37 @@ class ConditionalWriterImpl implements ConditionalWriter {
return serverQueue;
}
- private void queueRetry(List<QCMutation> mutations) {
- for (QCMutation qcm : mutations) {
- qcm.resetDelay();
- }
+ private void queueRetry(List<QCMutation> mutations, String server) {
- failedMutations.addAll(mutations);
+ if (timeout < Long.MAX_VALUE) {
+
+ long time = System.currentTimeMillis();
+
+ ArrayList<QCMutation> mutations2 = new ArrayList<ConditionalWriterImpl.QCMutation>(mutations.size());
+
+ for (QCMutation qcm : mutations) {
+ qcm.resetDelay();
+ if (time + qcm.getDelay(TimeUnit.MILLISECONDS) > qcm.entryTime + timeout) {
+ TimedOutException toe;
+ if (server != null)
+ toe = new TimedOutException(Collections.singleton(server));
+ else
+ toe = new TimedOutException("Conditional mutation timed out");
+
+ qcm.resultQueue.add(new Result(toe, qcm, server));
+ } else {
+ mutations2.add(qcm);
+ }
+ }
+
+ if (mutations2.size() > 0)
+ failedMutations.addAll(mutations2);
+
+ } else {
+ for (QCMutation qcm : mutations)
+ qcm.resetDelay();
+ failedMutations.addAll(mutations);
+ }
}
private void queue(List<QCMutation> mutations) {
@@ -221,7 +250,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
}
if (failures.size() > 0)
- queueRetry(failures);
+ queueRetry(failures, null);
for (Entry<String,TabletServerMutations<QCMutation>> entry : binnedMutations.entrySet()) {
queue(entry.getKey(), entry.getValue());
@@ -293,17 +322,17 @@ class ConditionalWriterImpl implements ConditionalWriter {
}
}
- ConditionalWriterImpl(Instance instance, TCredentials credentials, String tableId, Authorizations authorizations) {
+ ConditionalWriterImpl(Instance instance, TCredentials credentials, String tableId, ConditionalWriterConfig config) {
this.instance = instance;
this.credentials = credentials;
- this.auths = authorizations;
- this.ve = new VisibilityEvaluator(authorizations);
- // TODO make configurable
- this.threadPool = new ScheduledThreadPoolExecutor(3);
- this.threadPool.setMaximumPoolSize(3);
+ this.auths = config.getAuthorizations();
+ this.ve = new VisibilityEvaluator(config.getAuthorizations());
+ this.threadPool = new ScheduledThreadPoolExecutor(config.getMaxWriteThreads());
+ this.threadPool.setMaximumPoolSize(config.getMaxWriteThreads());
this.locator = TabletLocator.getLocator(instance, new Text(tableId));
this.serverQueues = new HashMap<String,ServerQueue>();
this.tableId = tableId;
+ this.timeout = config.getTimeout(TimeUnit.MILLISECONDS);
Runnable failureHandler = new Runnable() {
@@ -328,6 +357,8 @@ class ConditionalWriterImpl implements ConditionalWriter {
int count = 0;
+ long entryTime = System.currentTimeMillis();
+
mloop: while (mutations.hasNext()) {
// TODO stop reading from iterator if too much memory
ConditionalMutation mut = mutations.next();
@@ -341,7 +372,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
}
// copy the mutations so that even if caller changes it, it will not matter
- mutationList.add(new QCMutation(mut, resultQueue));
+ mutationList.add(new QCMutation(mut, resultQueue, entryTime));
}
queue(mutationList);
@@ -438,6 +469,15 @@ class ConditionalWriterImpl implements ConditionalWriter {
}
}
+ private TabletClientService.Iface getClient(String location) throws TTransportException {
+ TabletClientService.Iface client;
+ if (timeout < instance.getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT))
+ client = ThriftUtil.getTServerClient(location, timeout);
+ else
+ client = ThriftUtil.getTServerClient(location, instance.getConfiguration());
+ return client;
+ }
+
private void sendToServer(String location, TabletServerMutations<QCMutation> mutations) {
TabletClientService.Iface client = null;
@@ -449,7 +489,8 @@ class ConditionalWriterImpl implements ConditionalWriter {
Long sessionId = null;
try {
- client = ThriftUtil.getTServerClient(location, instance.getConfiguration());
+
+ client = getClient(location);
Map<TKeyExtent,List<TConditionalMutation>> tmutations = new HashMap<TKeyExtent,List<TConditionalMutation>>();
@@ -486,7 +527,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
locator.invalidateCache(ke);
}
- queueRetry(ignored);
+ queueRetry(ignored, location);
} catch (ThriftSecurityException tse) {
AccumuloSecurityException ase = new AccumuloSecurityException(credentials.getPrincipal(), tse.getCode(), Tables.getPrintableTableInfoFromId(instance,
@@ -508,11 +549,12 @@ class ConditionalWriterImpl implements ConditionalWriter {
}
}
- private void queueRetry(Map<Long,CMK> cmidToCm) {
+
+ private void queueRetry(Map<Long,CMK> cmidToCm, String location) {
ArrayList<QCMutation> ignored = new ArrayList<QCMutation>();
for (CMK cmk : cmidToCm.values())
ignored.add(cmk.cm);
- queueRetry(ignored);
+ queueRetry(ignored, location);
}
private void queueException(String location, Map<Long,CMK> cmidToCm, Exception e) {
@@ -522,7 +564,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
private void invalidateSession(String location, TabletServerMutations<QCMutation> mutations, Map<Long,CMK> cmidToCm, Long sessionId) {
if(sessionId == null){
- queueRetry(cmidToCm);
+ queueRetry(cmidToCm, location);
}else{
try {
invalidateSession(sessionId, location, mutations);
@@ -587,7 +629,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
TInfo tinfo = Tracer.traceInfo();
try {
- client = ThriftUtil.getTServerClient(location, instance.getConfiguration());
+ client = getClient(location);
client.invalidateConditionalUpdate(tinfo, sessionId);
} finally {
ThriftUtil.returnClient((TServiceClient) client);
@@ -678,14 +720,6 @@ class ConditionalWriterImpl implements ConditionalWriter {
return write(Collections.singleton(mutation).iterator()).next();
}
- public void setTimeout(long timeOut, TimeUnit timeUnit) {
- throw new UnsupportedOperationException();
- }
-
- public long getTimeout(TimeUnit timeUnit) {
- throw new UnsupportedOperationException();
- }
-
@Override
public void close() {
//TODO could possible close cached sessions using async method to clean up sessions on server side
http://git-wip-us.apache.org/repos/asf/accumulo/blob/79019ef0/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
index 693f3c9..57e36fd 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
@@ -25,6 +25,7 @@ import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.ConditionalWriter;
+import org.apache.accumulo.core.client.ConditionalWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.MultiTableBatchWriter;
@@ -131,10 +132,8 @@ public class ConnectorImpl extends Connector {
}
@Override
- public ConditionalWriter createConditionalWriter(String tableName, Authorizations authorizations) throws TableNotFoundException {
- ArgumentChecker.notNull(tableName, authorizations);
- // TODO resolve table name to table id here and pass that
- return new ConditionalWriterImpl(instance, credentials, getTableId(tableName), authorizations);
+ public ConditionalWriter createConditionalWriter(String tableName, ConditionalWriterConfig config) throws TableNotFoundException {
+ return new ConditionalWriterImpl(instance, credentials, getTableId(tableName), config);
}
@Override
http://git-wip-us.apache.org/repos/asf/accumulo/blob/79019ef0/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java b/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java
index 4a405aa..4af2ea5 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java
@@ -23,6 +23,7 @@ import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.ConditionalWriter;
+import org.apache.accumulo.core.client.ConditionalWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.MultiTableBatchWriter;
@@ -130,7 +131,7 @@ public class MockConnector extends Connector {
}
@Override
- public ConditionalWriter createConditionalWriter(String tableName, Authorizations authorizations) throws TableNotFoundException {
+ public ConditionalWriter createConditionalWriter(String tableName, ConditionalWriterConfig config) throws TableNotFoundException {
// TODO add implementation
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/79019ef0/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
index 4f7ba92..c1a1fc3 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
@@ -1933,7 +1933,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
ConditionalSession cs = (ConditionalSession) sessionManager.reserveSession(sessID);
- if(cs == null)
+ if (cs == null || cs.interruptFlag.get())
throw new NoSuchScanIDException();
Text tid = new Text(cs.tableId);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/79019ef0/test/src/main/java/org/apache/accumulo/test/FaultyConditionalWriter.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/FaultyConditionalWriter.java b/test/src/main/java/org/apache/accumulo/test/FaultyConditionalWriter.java
index de56218..7e7480f 100644
--- a/test/src/main/java/org/apache/accumulo/test/FaultyConditionalWriter.java
+++ b/test/src/main/java/org/apache/accumulo/test/FaultyConditionalWriter.java
@@ -20,7 +20,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.Random;
-import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.ConditionalWriter;
import org.apache.accumulo.core.data.ConditionalMutation;
@@ -74,14 +73,6 @@ public class FaultyConditionalWriter implements ConditionalWriter {
return write(Collections.singleton(mutation).iterator()).next();
}
- public void setTimeout(long timeOut, TimeUnit timeUnit) {
- cw.setTimeout(timeOut, timeUnit);
- }
-
- public long getTimeout(TimeUnit timeUnit) {
- return cw.getTimeout(timeUnit);
- }
-
@Override
public void close() {
cw.close();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/79019ef0/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java b/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java
index a71b1ad..03eaefb 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java
@@ -17,10 +17,13 @@
package org.apache.accumulo.test.functional;
import java.io.IOException;
+import java.util.Collection;
import java.util.Map;
import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
@@ -30,13 +33,19 @@ import org.apache.accumulo.core.util.UtilWaitThread;
public class SlowIterator extends WrappingIterator {
static private final String SLEEP_TIME = "sleepTime";
+ static private final String SEEK_SLEEP_TIME = "seekSleepTime";
- long sleepTime;
+ private long sleepTime = 0;
+ private long seekSleepTime = 0;
public static void setSleepTime(IteratorSetting is, long millis) {
is.addOption(SLEEP_TIME, Long.toString(millis));
}
+ public static void setSeekSleepTime(IteratorSetting is, long t) {
+ is.addOption(SEEK_SLEEP_TIME, Long.toString(t));
+ }
+
@Override
public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
throw new UnsupportedOperationException();
@@ -49,9 +58,20 @@ public class SlowIterator extends WrappingIterator {
}
@Override
+ public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
+ UtilWaitThread.sleep(seekSleepTime);
+ super.seek(range, columnFamilies, inclusive);
+ }
+
+ @Override
public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
super.init(source, options, env);
- sleepTime = Long.parseLong(options.get(SLEEP_TIME));
+ if (options.containsKey(SLEEP_TIME))
+ sleepTime = Long.parseLong(options.get(SLEEP_TIME));
+
+ if (options.containsKey(SEEK_SLEEP_TIME))
+ seekSleepTime = Long.parseLong(options.get(SEEK_SLEEP_TIME));
}
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/79019ef0/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java b/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java
index 65a5636..66b699e 100644
--- a/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java
+++ b/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java
@@ -41,6 +41,7 @@ import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.ConditionalWriter;
import org.apache.accumulo.core.client.ConditionalWriter.Result;
import org.apache.accumulo.core.client.ConditionalWriter.Status;
+import org.apache.accumulo.core.client.ConditionalWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.IsolatedScanner;
import org.apache.accumulo.core.client.IteratorSetting;
@@ -74,6 +75,7 @@ import org.apache.accumulo.examples.simple.constraints.AlphaNumKeyConstraint;
import org.apache.accumulo.minicluster.MiniAccumuloCluster;
import org.apache.accumulo.minicluster.MiniAccumuloConfig;
import org.apache.accumulo.test.functional.BadIterator;
+import org.apache.accumulo.test.functional.SlowIterator;
import org.apache.hadoop.io.Text;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -107,7 +109,7 @@ public class ConditionalWriterTest {
conn.tableOperations().create("foo");
- ConditionalWriter cw = conn.createConditionalWriter("foo", Authorizations.EMPTY);
+ ConditionalWriter cw = conn.createConditionalWriter("foo", new ConditionalWriterConfig());
// mutation conditional on column tx:seq not exiting
ConditionalMutation cm0 = new ConditionalMutation("99006", new Condition("tx", "seq"));
@@ -190,7 +192,7 @@ public class ConditionalWriterTest {
conn.securityOperations().changeUserAuthorizations("root", auths);
- ConditionalWriter cw = conn.createConditionalWriter(table, auths);
+ ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig().setAuthorizations(auths));
ColumnVisibility cva = new ColumnVisibility("A");
ColumnVisibility cvb = new ColumnVisibility("B");
@@ -278,7 +280,7 @@ public class ConditionalWriterTest {
Authorizations filteredAuths = new Authorizations("A");
- ConditionalWriter cw = conn.createConditionalWriter(table, filteredAuths);
+ ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig().setAuthorizations(filteredAuths));
ColumnVisibility cva = new ColumnVisibility("A");
ColumnVisibility cvb = new ColumnVisibility("B");
@@ -340,6 +342,25 @@ public class ConditionalWriterTest {
Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm7).getStatus());
cw.close();
+
+ // test passing auths that exceed users configured auths
+
+ Authorizations exceedingAuths = new Authorizations("A", "B", "D");
+ ConditionalWriter cw2 = conn.createConditionalWriter(table, new ConditionalWriterConfig().setAuthorizations(exceedingAuths));
+
+ ConditionalMutation cm8 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb), new Condition("tx", "seq").setVisibility(cva)
+ .setValue("1"));
+ cm8.put("name", "last", cva, "doe");
+ cm8.put("name", "first", cva, "john");
+ cm8.put("tx", "seq", cva, "1");
+
+ try {
+ cw2.write(cm8).getStatus();
+ Assert.assertTrue(false);
+ } catch (AccumuloSecurityException ase) {}
+
+
+ cw2.close();
}
@Test
@@ -356,7 +377,7 @@ public class ConditionalWriterTest {
Scanner scanner = conn.createScanner(table + "_clone", new Authorizations());
- ConditionalWriter cw = conn.createConditionalWriter(table + "_clone", new Authorizations());
+ ConditionalWriter cw = conn.createConditionalWriter(table + "_clone", new ConditionalWriterConfig());
ConditionalMutation cm0 = new ConditionalMutation("99006+", new Condition("tx", "seq"));
cm0.put("tx", "seq", "1");
@@ -421,7 +442,7 @@ public class ConditionalWriterTest {
Assert.assertEquals("3", scanner.iterator().next().getValue().toString());
- ConditionalWriter cw = conn.createConditionalWriter(table, new Authorizations());
+ ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig());
ConditionalMutation cm0 = new ConditionalMutation("ACCUMULO-1000", new Condition("count", "comments").setValue("3"));
cm0.put("count", "comments", "1");
@@ -504,7 +525,7 @@ public class ConditionalWriterTest {
cm2.put("tx", "seq", cvab, "1");
mutations.add(cm2);
- ConditionalWriter cw = conn.createConditionalWriter(table, new Authorizations("A"));
+ ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig().setAuthorizations(new Authorizations("A")));
Iterator<Result> results = cw.write(mutations.iterator());
int count = 0;
while (results.hasNext()) {
@@ -611,7 +632,7 @@ public class ConditionalWriterTest {
cml.add(cm);
}
- ConditionalWriter cw = conn.createConditionalWriter(table, Authorizations.EMPTY);
+ ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig());
Iterator<Result> results = cw.write(cml.iterator());
@@ -704,7 +725,7 @@ public class ConditionalWriterTest {
cm3.put("tx", "seq", cvaob, "2");
mutations.add(cm3);
- ConditionalWriter cw = conn.createConditionalWriter(table, new Authorizations("A"));
+ ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig().setAuthorizations(new Authorizations("A")));
Iterator<Result> results = cw.write(mutations.iterator());
HashSet<String> rows = new HashSet<String>();
while (results.hasNext()) {
@@ -745,7 +766,7 @@ public class ConditionalWriterTest {
conn.tableOperations().create(table);
- ConditionalWriter cw = conn.createConditionalWriter(table, Authorizations.EMPTY);
+ ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig());
ConditionalMutation cm1 = new ConditionalMutation("r1", new Condition("tx", "seq"));
cm1.put("tx", "seq", "1");
@@ -942,7 +963,7 @@ public class ConditionalWriterTest {
break;
}
- ConditionalWriter cw = conn.createConditionalWriter(table, Authorizations.EMPTY);
+ ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig());
ArrayList<ByteSequence> rows = new ArrayList<ByteSequence>();
@@ -1026,9 +1047,9 @@ public class ConditionalWriterTest {
cm1.put("tx", "seq", "1");
cm1.put("data", "x", "a");
- ConditionalWriter cw1 = conn2.createConditionalWriter("sect1", Authorizations.EMPTY);
- ConditionalWriter cw2 = conn2.createConditionalWriter("sect2", Authorizations.EMPTY);
- ConditionalWriter cw3 = conn2.createConditionalWriter("sect3", Authorizations.EMPTY);
+ ConditionalWriter cw1 = conn2.createConditionalWriter("sect1", new ConditionalWriterConfig());
+ ConditionalWriter cw2 = conn2.createConditionalWriter("sect2", new ConditionalWriterConfig());
+ ConditionalWriter cw3 = conn2.createConditionalWriter("sect3", new ConditionalWriterConfig());
Assert.assertEquals(Status.ACCEPTED, cw3.write(cm1).getStatus());
@@ -1050,8 +1071,54 @@ public class ConditionalWriterTest {
@Test
- public void testTimeout() {
- // TODO
+ public void testTimeout() throws Exception {
+ ZooKeeperInstance zki = new ZooKeeperInstance(cluster.getInstanceName(), cluster.getZooKeepers());
+ Connector conn = zki.getConnector("root", new PasswordToken(secret));
+
+ String table = "fooT";
+
+ conn.tableOperations().create(table);
+
+ ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig().setTimeout(1, TimeUnit.SECONDS));
+
+ ConditionalMutation cm1 = new ConditionalMutation("r1", new Condition("tx", "seq"));
+ cm1.put("tx", "seq", "1");
+ cm1.put("data", "x", "a");
+
+ Assert.assertEquals(cw.write(cm1).getStatus(), Status.ACCEPTED);
+
+ IteratorSetting is = new IteratorSetting(5, SlowIterator.class);
+ SlowIterator.setSeekSleepTime(is, 4000);
+
+ ConditionalMutation cm2 = new ConditionalMutation("r1", new Condition("tx", "seq").setValue("1").setIterators(is));
+ cm2.put("tx", "seq", "2");
+ cm2.put("data", "x", "b");
+
+ Assert.assertEquals(cw.write(cm2).getStatus(), Status.UNKNOWN);
+
+ Scanner scanner = conn.createScanner(table, Authorizations.EMPTY);
+
+ for (Entry<Key,Value> entry : scanner) {
+ String cf = entry.getKey().getColumnFamilyData().toString();
+ String cq = entry.getKey().getColumnQualifierData().toString();
+ String val = entry.getValue().toString();
+
+ if (cf.equals("tx") && cq.equals("seq"))
+ Assert.assertEquals("1", val);
+ else if (cf.equals("data") && cq.equals("x"))
+ Assert.assertEquals("a", val);
+ else
+ Assert.assertTrue(false);
+ }
+
+ ConditionalMutation cm3 = new ConditionalMutation("r1", new Condition("tx", "seq").setValue("1"));
+ cm3.put("tx", "seq", "2");
+ cm3.put("data", "x", "b");
+
+ Assert.assertEquals(cw.write(cm3).getStatus(), Status.ACCEPTED);
+
+ cw.close();
+
}
@Test
@@ -1062,13 +1129,13 @@ public class ConditionalWriterTest {
Connector conn = zki.getConnector("root", new PasswordToken(secret));
try {
- conn.createConditionalWriter(table, Authorizations.EMPTY);
+ conn.createConditionalWriter(table, new ConditionalWriterConfig());
Assert.assertFalse(true);
} catch (TableNotFoundException e) {}
conn.tableOperations().create(table);
- ConditionalWriter cw = conn.createConditionalWriter(table, Authorizations.EMPTY);
+ ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig());
conn.tableOperations().delete(table);
@@ -1081,8 +1148,8 @@ public class ConditionalWriterTest {
try {
result.getStatus();
Assert.assertFalse(true);
- } catch (TableDeletedException ae) {
-
+ } catch (AccumuloException ae) {
+ Assert.assertEquals(TableDeletedException.class, ae.getCause().getClass());
}
}
@@ -1096,7 +1163,7 @@ public class ConditionalWriterTest {
conn.tableOperations().create(table);
- ConditionalWriter cw = conn.createConditionalWriter(table, Authorizations.EMPTY);
+ ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig());
conn.tableOperations().offline(table);
@@ -1111,14 +1178,14 @@ public class ConditionalWriterTest {
try {
result.getStatus();
Assert.assertFalse(true);
- } catch (TableOfflineException ae) {
-
+ } catch (AccumuloException ae) {
+ Assert.assertEquals(TableOfflineException.class, ae.getCause().getClass());
}
cw.close();
try {
- conn.createConditionalWriter(table, Authorizations.EMPTY);
+ conn.createConditionalWriter(table, new ConditionalWriterConfig());
Assert.assertFalse(true);
} catch (TableOfflineException e) {}
}
@@ -1140,7 +1207,7 @@ public class ConditionalWriterTest {
conn.tableOperations().create(table);
- ConditionalWriter cw = conn.createConditionalWriter(table, Authorizations.EMPTY);
+ ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig());
IteratorSetting iterSetting = new IteratorSetting(5, BadIterator.class);