You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2013/07/23 18:55:14 UTC

[49/50] git commit: ACCUMULO-1000 Added timeout & config to conditional writer. Added unit test

ACCUMULO-1000 Added timeout & config to conditional writer.  Added unit test


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/79019ef0
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/79019ef0
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/79019ef0

Branch: refs/heads/ACCUMULO-1000
Commit: 79019ef0477b76966e2aff7259443aa9cd2f1cce
Parents: 5183ae4
Author: Keith Turner <kt...@apache.org>
Authored: Tue Jul 23 12:07:41 2013 -0400
Committer: Keith Turner <kt...@apache.org>
Committed: Tue Jul 23 12:11:06 2013 -0400

----------------------------------------------------------------------
 .../accumulo/core/client/ConditionalWriter.java |  35 ++----
 .../core/client/ConditionalWriterConfig.java    | 118 +++++++++++++++++++
 .../apache/accumulo/core/client/Connector.java  |   9 +-
 .../core/client/impl/ConditionalWriterImpl.java |  96 ++++++++++-----
 .../core/client/impl/ConnectorImpl.java         |   7 +-
 .../core/client/mock/MockConnector.java         |   3 +-
 .../server/tabletserver/TabletServer.java       |   2 +-
 .../accumulo/test/FaultyConditionalWriter.java  |   9 --
 .../accumulo/test/functional/SlowIterator.java  |  24 +++-
 .../accumulo/test/ConditionalWriterTest.java    | 115 ++++++++++++++----
 10 files changed, 313 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/79019ef0/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java b/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java
index b434463..db29492 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java
@@ -18,8 +18,8 @@
 package org.apache.accumulo.core.client;
 
 import java.util.Iterator;
-import java.util.concurrent.TimeUnit;
 
+import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode;
 import org.apache.accumulo.core.data.ConditionalMutation;
 
 /**
@@ -48,11 +48,11 @@ public interface ConditionalWriter {
     public Status getStatus() throws AccumuloException, AccumuloSecurityException {
       if (status == null) {
         if (exception instanceof AccumuloException)
-          throw (AccumuloException) exception;
-        if (exception instanceof AccumuloSecurityException)
-          throw (AccumuloSecurityException) exception;
-        if (exception instanceof RuntimeException)
-          throw (RuntimeException) exception;
+          throw new AccumuloException(exception);
+        if (exception instanceof AccumuloSecurityException) {
+          AccumuloSecurityException ase = (AccumuloSecurityException) exception;
+          throw new AccumuloSecurityException(ase.getUser(), SecurityErrorCode.valueOf(ase.getSecurityErrorCode().name()), ase.getTableInfo(), ase);
+        }
         else
           throw new AccumuloException(exception);
       }
@@ -94,33 +94,12 @@ public interface ConditionalWriter {
      * A condition contained a column visibility that could never be seen
      */
     INVISIBLE_VISIBILITY,
-    /**
-     * nothing was done with this mutation, this is caused by previous mutations failing in some way like timing out
-     */
-    IGNORED
+
   }
 
   public abstract Iterator<Result> write(Iterator<ConditionalMutation> mutations);
   
   public abstract Result write(ConditionalMutation mutation);
-  
-  /**
-   * This setting determines how long a scanner will automatically retry when a failure occurs. By default a scanner will retry forever.
-   * 
-   * Setting to zero or Long.MAX_VALUE and TimeUnit.MILLISECONDS means to retry forever.
-   * 
-   * @param timeOut
-   * @param timeUnit
-   *          determines how timeout is interpreted
-   */
-  public void setTimeout(long timeOut, TimeUnit timeUnit);
-  
-  /**
-   * Returns the setting for how long a scanner will automatically retry when a failure occurs.
-   * 
-   * @return the timeout configured for this scanner
-   */
-  public long getTimeout(TimeUnit timeUnit);
 
   public void close();
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/79019ef0/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriterConfig.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriterConfig.java b/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriterConfig.java
new file mode 100644
index 0000000..f2a91ea
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriterConfig.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.ArgumentChecker;
+
+/**
+ * 
+ * @since 1.6.0
+ */
+public class ConditionalWriterConfig {
+  
+  private static final Long DEFAULT_TIMEOUT = Long.MAX_VALUE;
+  private Long timeout = null;
+  
+  private static final Integer DEFAULT_MAX_WRITE_THREADS = 3;
+  private Integer maxWriteThreads = null;
+  
+  private Authorizations auths = Authorizations.EMPTY;
+  
+  /**
+   * A set of authorization labels that will be checked against the column visibility of each key in order to filter data. The authorizations passed in must be
+   * a subset of the accumulo user's set of authorizations. If the accumulo user has authorizations (A1, A2) and authorizations (A2, A3) are passed, then an
+   * exception will be thrown.
+   * 
+   * <p>
+   * Any condition that is not visible with this set of authorizations will fail.
+   * 
+   * @param auths
+   */
+  public ConditionalWriterConfig setAuthorizations(Authorizations auths) {
+    ArgumentChecker.notNull(auths);
+    this.auths = auths;
+    return this;
+  }
+  
+  /**
+   * Sets the maximum amount of time an unresponsive server will be re-tried. When this timeout is exceeded, the {@link ConditionalWriter} should return the
+   * mutation with an exception.<br />
+   * For no timeout, set to zero, or {@link Long#MAX_VALUE} with {@link TimeUnit#MILLISECONDS}.
+   * 
+   * <p>
+   * {@link TimeUnit#MICROSECONDS} or {@link TimeUnit#NANOSECONDS} will be truncated to the nearest {@link TimeUnit#MILLISECONDS}.<br />
+   * If this truncation would result in making the value zero when it was specified as non-zero, then a minimum value of one {@link TimeUnit#MILLISECONDS} will
+   * be used.
+   * 
+   * <p>
+   * <b>Default:</b> {@link Long#MAX_VALUE} (no timeout)
+   * 
+   * @param timeout
+   *          the timeout, in the unit specified by the value of {@code timeUnit}
+   * @param timeUnit
+   *          determines how {@code timeout} will be interpreted
+   * @throws IllegalArgumentException
+   *           if {@code timeout} is less than 0
+   * @return {@code this} to allow chaining of set methods
+   */
+  public ConditionalWriterConfig setTimeout(long timeout, TimeUnit timeUnit) {
+    if (timeout < 0)
+      throw new IllegalArgumentException("Negative timeout not allowed " + timeout);
+    
+    if (timeout == 0)
+      this.timeout = Long.MAX_VALUE;
+    else
+      // make small, positive values that truncate to 0 when converted use the minimum millis instead
+      this.timeout = Math.max(1, timeUnit.toMillis(timeout));
+    return this;
+  }
+  
+  /**
+   * Sets the maximum number of threads to use for writing data to the tablet servers.
+   * 
+   * <p>
+   * <b>Default:</b> 3
+   * 
+   * @param maxWriteThreads
+   *          the maximum threads to use
+   * @throws IllegalArgumentException
+   *           if {@code maxWriteThreads} is non-positive
+   * @return {@code this} to allow chaining of set methods
+   */
+  public ConditionalWriterConfig setMaxWriteThreads(int maxWriteThreads) {
+    if (maxWriteThreads <= 0)
+      throw new IllegalArgumentException("Max threads must be positive " + maxWriteThreads);
+    
+    this.maxWriteThreads = maxWriteThreads;
+    return this;
+  }
+  
+  public Authorizations getAuthorizations() {
+    return auths;
+  }
+
+  public long getTimeout(TimeUnit timeUnit) {
+    return timeUnit.convert(timeout != null ? timeout : DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);
+  }
+  
+  public int getMaxWriteThreads() {
+    return maxWriteThreads != null ? maxWriteThreads : DEFAULT_MAX_WRITE_THREADS;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/79019ef0/core/src/main/java/org/apache/accumulo/core/client/Connector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/Connector.java b/core/src/main/java/org/apache/accumulo/core/client/Connector.java
index 45a8162..bbfa55f 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/Connector.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/Connector.java
@@ -178,16 +178,15 @@ public abstract class Connector {
    * 
    * @param tableName
    *          the name of the table to query data from
-   * @param authorizations
-   *          A set of authorization labels that will be checked against the column visibility of each key in order to filter data. The authorizations passed in
-   *          must be a subset of the accumulo user's set of authorizations. If the accumulo user has authorizations (A1, A2) and authorizations (A2, A3) are
-   *          passed, then an exception will be thrown.
+   * @param config
+   *          configuration used to create conditional writer
    * 
    * @return ConditionalWriter object for writing ConditionalMutations
    * @throws TableNotFoundException
    *           when the specified table doesn't exist
+   * @since 1.6.0
    */
-  public abstract ConditionalWriter createConditionalWriter(String tableName, Authorizations authorizations) throws TableNotFoundException;
+  public abstract ConditionalWriter createConditionalWriter(String tableName, ConditionalWriterConfig config) throws TableNotFoundException;
 
   /**
    * Accessor method for internal instance object.

http://git-wip-us.apache.org/repos/asf/accumulo/blob/79019ef0/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
index ed20054..55aa718 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
@@ -37,12 +37,15 @@ import java.util.concurrent.TimeUnit;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.ConditionalWriter;
+import org.apache.accumulo.core.client.ConditionalWriterConfig;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.TableDeletedException;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.TableOfflineException;
+import org.apache.accumulo.core.client.TimedOutException;
 import org.apache.accumulo.core.client.impl.TabletLocator.TabletServerMutations;
 import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Condition;
 import org.apache.accumulo.core.data.ConditionalMutation;
@@ -94,6 +97,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
   private TCredentials credentials;
   private TabletLocator locator;
   private String tableId;
+  private long timeout;
 
   private static class ServerQueue {
     BlockingQueue<TabletServerMutations<QCMutation>> queue = new LinkedBlockingQueue<TabletServerMutations<QCMutation>>();
@@ -125,7 +129,6 @@ class ConditionalWriterImpl implements ConditionalWriter {
         throw new NoSuchElementException();
 
       try {
-        // TODO maybe call drainTo after take() to get a batch efficiently
         Result result = rq.poll(1, TimeUnit.SECONDS);
         while (result == null) {
           
@@ -153,12 +156,14 @@ class ConditionalWriterImpl implements ConditionalWriter {
     private BlockingQueue<Result> resultQueue;
     private long resetTime;
     private long delay = 50;
+    private long entryTime;
     
-    QCMutation(ConditionalMutation cm, BlockingQueue<Result> resultQueue) {
+    QCMutation(ConditionalMutation cm, BlockingQueue<Result> resultQueue, long entryTime) {
       super(cm);
       this.resultQueue = resultQueue;
+      this.entryTime = entryTime;
     }
-    
+
     @Override
     public int compareTo(Delayed o) {
       QCMutation oqcm = (QCMutation) o;
@@ -171,7 +176,6 @@ class ConditionalWriterImpl implements ConditionalWriter {
     }
     
     void resetDelay() {
-      // TODO eventually timeout a mutation
       delay = Math.min(delay * 2, MAX_SLEEP);
       resetTime = System.currentTimeMillis();
     }
@@ -190,12 +194,37 @@ class ConditionalWriterImpl implements ConditionalWriter {
     return serverQueue;
   }
   
-  private void queueRetry(List<QCMutation> mutations) {
-    for (QCMutation qcm : mutations) {
-      qcm.resetDelay();
-    }
+  private void queueRetry(List<QCMutation> mutations, String server) {
     
-    failedMutations.addAll(mutations);
+    if (timeout < Long.MAX_VALUE) {
+      
+      long time = System.currentTimeMillis();
+      
+      ArrayList<QCMutation> mutations2 = new ArrayList<ConditionalWriterImpl.QCMutation>(mutations.size());
+
+      for (QCMutation qcm : mutations) {
+        qcm.resetDelay();
+        if (time + qcm.getDelay(TimeUnit.MILLISECONDS) > qcm.entryTime + timeout) {
+          TimedOutException toe;
+          if (server != null)
+            toe = new TimedOutException(Collections.singleton(server));
+          else
+            toe = new TimedOutException("Conditional mutation timed out");
+          
+          qcm.resultQueue.add(new Result(toe, qcm, server));
+        } else {
+          mutations2.add(qcm);
+        }
+      }
+      
+      if (mutations2.size() > 0)
+        failedMutations.addAll(mutations2);
+
+    } else {
+      for (QCMutation qcm : mutations)
+        qcm.resetDelay();
+      failedMutations.addAll(mutations);
+    }
   }
 
   private void queue(List<QCMutation> mutations) {
@@ -221,7 +250,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
     }
     
     if (failures.size() > 0)
-      queueRetry(failures);
+      queueRetry(failures, null);
 
     for (Entry<String,TabletServerMutations<QCMutation>> entry : binnedMutations.entrySet()) {
       queue(entry.getKey(), entry.getValue());
@@ -293,17 +322,17 @@ class ConditionalWriterImpl implements ConditionalWriter {
     }
   }
 
-  ConditionalWriterImpl(Instance instance, TCredentials credentials, String tableId, Authorizations authorizations) {
+  ConditionalWriterImpl(Instance instance, TCredentials credentials, String tableId, ConditionalWriterConfig config) {
     this.instance = instance;
     this.credentials = credentials;
-    this.auths = authorizations;
-    this.ve = new VisibilityEvaluator(authorizations);
-    // TODO make configurable
-    this.threadPool = new ScheduledThreadPoolExecutor(3);
-    this.threadPool.setMaximumPoolSize(3);
+    this.auths = config.getAuthorizations();
+    this.ve = new VisibilityEvaluator(config.getAuthorizations());
+    this.threadPool = new ScheduledThreadPoolExecutor(config.getMaxWriteThreads());
+    this.threadPool.setMaximumPoolSize(config.getMaxWriteThreads());
     this.locator = TabletLocator.getLocator(instance, new Text(tableId));
     this.serverQueues = new HashMap<String,ServerQueue>();
     this.tableId = tableId;
+    this.timeout = config.getTimeout(TimeUnit.MILLISECONDS);
 
     Runnable failureHandler = new Runnable() {
       
@@ -328,6 +357,8 @@ class ConditionalWriterImpl implements ConditionalWriter {
 
     int count = 0;
 
+    long entryTime = System.currentTimeMillis();
+
     mloop: while (mutations.hasNext()) {
       // TODO stop reading from iterator if too much memory
       ConditionalMutation mut = mutations.next();
@@ -341,7 +372,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
       }
 
       // copy the mutations so that even if caller changes it, it will not matter
-      mutationList.add(new QCMutation(mut, resultQueue));
+      mutationList.add(new QCMutation(mut, resultQueue, entryTime));
     }
 
     queue(mutationList);
@@ -438,6 +469,15 @@ class ConditionalWriterImpl implements ConditionalWriter {
     }
   }
   
+  private TabletClientService.Iface getClient(String location) throws TTransportException {
+    TabletClientService.Iface client;
+    if (timeout < instance.getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT))
+      client = ThriftUtil.getTServerClient(location, timeout);
+    else
+      client = ThriftUtil.getTServerClient(location, instance.getConfiguration());
+    return client;
+  }
+
   private void sendToServer(String location, TabletServerMutations<QCMutation> mutations) {
     TabletClientService.Iface client = null;
     
@@ -449,7 +489,8 @@ class ConditionalWriterImpl implements ConditionalWriter {
     Long sessionId = null;
     
     try {
-      client = ThriftUtil.getTServerClient(location, instance.getConfiguration());
+      
+      client = getClient(location);
 
       Map<TKeyExtent,List<TConditionalMutation>> tmutations = new HashMap<TKeyExtent,List<TConditionalMutation>>();
 
@@ -486,7 +527,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
         locator.invalidateCache(ke);
       }
 
-      queueRetry(ignored);
+      queueRetry(ignored, location);
 
     } catch (ThriftSecurityException tse) {
       AccumuloSecurityException ase = new AccumuloSecurityException(credentials.getPrincipal(), tse.getCode(), Tables.getPrintableTableInfoFromId(instance,
@@ -508,11 +549,12 @@ class ConditionalWriterImpl implements ConditionalWriter {
     }
   }
 
-  private void queueRetry(Map<Long,CMK> cmidToCm) {
+
+  private void queueRetry(Map<Long,CMK> cmidToCm, String location) {
     ArrayList<QCMutation> ignored = new ArrayList<QCMutation>();
     for (CMK cmk : cmidToCm.values())
     	ignored.add(cmk.cm);
-    queueRetry(ignored);
+    queueRetry(ignored, location);
   }
 
   private void queueException(String location, Map<Long,CMK> cmidToCm, Exception e) {
@@ -522,7 +564,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
 
   private void invalidateSession(String location, TabletServerMutations<QCMutation> mutations, Map<Long,CMK> cmidToCm, Long sessionId) {
     if(sessionId == null){
-      queueRetry(cmidToCm);
+      queueRetry(cmidToCm, location);
     }else{
       try {
         invalidateSession(sessionId, location, mutations);
@@ -587,7 +629,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
     TInfo tinfo = Tracer.traceInfo();
     
     try {
-      client = ThriftUtil.getTServerClient(location, instance.getConfiguration());
+      client = getClient(location);
       client.invalidateConditionalUpdate(tinfo, sessionId);
     } finally {
       ThriftUtil.returnClient((TServiceClient) client);
@@ -678,14 +720,6 @@ class ConditionalWriterImpl implements ConditionalWriter {
     return write(Collections.singleton(mutation).iterator()).next();
   }
   
-  public void setTimeout(long timeOut, TimeUnit timeUnit) {
-    throw new UnsupportedOperationException();
-  }
-  
-  public long getTimeout(TimeUnit timeUnit) {
-    throw new UnsupportedOperationException();
-  }
-  
   @Override
   public void close() {
     //TODO could possible close cached sessions using async method to clean up sessions on server side

http://git-wip-us.apache.org/repos/asf/accumulo/blob/79019ef0/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
index 693f3c9..57e36fd 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
@@ -25,6 +25,7 @@ import org.apache.accumulo.core.client.BatchScanner;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.ConditionalWriter;
+import org.apache.accumulo.core.client.ConditionalWriterConfig;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.MultiTableBatchWriter;
@@ -131,10 +132,8 @@ public class ConnectorImpl extends Connector {
   }
   
   @Override
-  public ConditionalWriter createConditionalWriter(String tableName, Authorizations authorizations) throws TableNotFoundException {
-    ArgumentChecker.notNull(tableName, authorizations);
-    // TODO resolve table name to table id here and pass that
-    return new ConditionalWriterImpl(instance, credentials, getTableId(tableName), authorizations);
+  public ConditionalWriter createConditionalWriter(String tableName, ConditionalWriterConfig config) throws TableNotFoundException {
+    return new ConditionalWriterImpl(instance, credentials, getTableId(tableName), config);
   }
   
   @Override

http://git-wip-us.apache.org/repos/asf/accumulo/blob/79019ef0/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java b/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java
index 4a405aa..4af2ea5 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java
@@ -23,6 +23,7 @@ import org.apache.accumulo.core.client.BatchScanner;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.ConditionalWriter;
+import org.apache.accumulo.core.client.ConditionalWriterConfig;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.MultiTableBatchWriter;
@@ -130,7 +131,7 @@ public class MockConnector extends Connector {
   }
   
   @Override
-  public ConditionalWriter createConditionalWriter(String tableName, Authorizations authorizations) throws TableNotFoundException {
+  public ConditionalWriter createConditionalWriter(String tableName, ConditionalWriterConfig config) throws TableNotFoundException {
     // TODO add implementation
     throw new UnsupportedOperationException();
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/79019ef0/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
index 4f7ba92..c1a1fc3 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
@@ -1933,7 +1933,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
       
       ConditionalSession cs = (ConditionalSession) sessionManager.reserveSession(sessID);
       
-      if(cs == null)
+      if (cs == null || cs.interruptFlag.get())
         throw new NoSuchScanIDException();
       
       Text tid = new Text(cs.tableId);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/79019ef0/test/src/main/java/org/apache/accumulo/test/FaultyConditionalWriter.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/FaultyConditionalWriter.java b/test/src/main/java/org/apache/accumulo/test/FaultyConditionalWriter.java
index de56218..7e7480f 100644
--- a/test/src/main/java/org/apache/accumulo/test/FaultyConditionalWriter.java
+++ b/test/src/main/java/org/apache/accumulo/test/FaultyConditionalWriter.java
@@ -20,7 +20,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.Random;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.client.ConditionalWriter;
 import org.apache.accumulo.core.data.ConditionalMutation;
@@ -74,14 +73,6 @@ public class FaultyConditionalWriter implements ConditionalWriter {
     return write(Collections.singleton(mutation).iterator()).next();
   }
   
-  public void setTimeout(long timeOut, TimeUnit timeUnit) {
-    cw.setTimeout(timeOut, timeUnit);
-  }
-  
-  public long getTimeout(TimeUnit timeUnit) {
-    return cw.getTimeout(timeUnit);
-  }
-  
   @Override
   public void close() {
     cw.close();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/79019ef0/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java b/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java
index a71b1ad..03eaefb 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java
@@ -17,10 +17,13 @@
 package org.apache.accumulo.test.functional;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Map;
 
 import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
@@ -30,13 +33,19 @@ import org.apache.accumulo.core.util.UtilWaitThread;
 public class SlowIterator extends WrappingIterator {
 
   static private final String SLEEP_TIME = "sleepTime";
+  static private final String SEEK_SLEEP_TIME = "seekSleepTime";
   
-  long sleepTime;
+  private long sleepTime = 0;
+  private long seekSleepTime = 0;
   
   public static void setSleepTime(IteratorSetting is, long millis) {
     is.addOption(SLEEP_TIME, Long.toString(millis));  
   }
   
+  public static void setSeekSleepTime(IteratorSetting is, long t) {
+    is.addOption(SEEK_SLEEP_TIME, Long.toString(t));
+  }
+
   @Override
   public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
     throw new UnsupportedOperationException();
@@ -49,9 +58,20 @@ public class SlowIterator extends WrappingIterator {
   }
   
   @Override
+  public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
+    UtilWaitThread.sleep(seekSleepTime);
+    super.seek(range, columnFamilies, inclusive);
+  }
+  
+  @Override
   public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
     super.init(source, options, env);
-    sleepTime = Long.parseLong(options.get(SLEEP_TIME));
+    if (options.containsKey(SLEEP_TIME))
+      sleepTime = Long.parseLong(options.get(SLEEP_TIME));
+    
+    if (options.containsKey(SEEK_SLEEP_TIME))
+      seekSleepTime = Long.parseLong(options.get(SEEK_SLEEP_TIME));
   }
+
   
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/79019ef0/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java b/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java
index 65a5636..66b699e 100644
--- a/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java
+++ b/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java
@@ -41,6 +41,7 @@ import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.ConditionalWriter;
 import org.apache.accumulo.core.client.ConditionalWriter.Result;
 import org.apache.accumulo.core.client.ConditionalWriter.Status;
+import org.apache.accumulo.core.client.ConditionalWriterConfig;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.IsolatedScanner;
 import org.apache.accumulo.core.client.IteratorSetting;
@@ -74,6 +75,7 @@ import org.apache.accumulo.examples.simple.constraints.AlphaNumKeyConstraint;
 import org.apache.accumulo.minicluster.MiniAccumuloCluster;
 import org.apache.accumulo.minicluster.MiniAccumuloConfig;
 import org.apache.accumulo.test.functional.BadIterator;
+import org.apache.accumulo.test.functional.SlowIterator;
 import org.apache.hadoop.io.Text;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -107,7 +109,7 @@ public class ConditionalWriterTest {
     
     conn.tableOperations().create("foo");
 
-    ConditionalWriter cw = conn.createConditionalWriter("foo", Authorizations.EMPTY);
+    ConditionalWriter cw = conn.createConditionalWriter("foo", new ConditionalWriterConfig());
     
     // mutation conditional on column tx:seq not exiting
     ConditionalMutation cm0 = new ConditionalMutation("99006", new Condition("tx", "seq"));
@@ -190,7 +192,7 @@ public class ConditionalWriterTest {
     
     conn.securityOperations().changeUserAuthorizations("root", auths);
     
-    ConditionalWriter cw = conn.createConditionalWriter(table, auths);
+    ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig().setAuthorizations(auths));
     
     ColumnVisibility cva = new ColumnVisibility("A");
     ColumnVisibility cvb = new ColumnVisibility("B");
@@ -278,7 +280,7 @@ public class ConditionalWriterTest {
 
     Authorizations filteredAuths = new Authorizations("A");
     
-    ConditionalWriter cw = conn.createConditionalWriter(table, filteredAuths);
+    ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig().setAuthorizations(filteredAuths));
     
     ColumnVisibility cva = new ColumnVisibility("A");
     ColumnVisibility cvb = new ColumnVisibility("B");
@@ -340,6 +342,25 @@ public class ConditionalWriterTest {
     Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm7).getStatus());
     
     cw.close();
+
+    // test passing auths that exceed users configured auths
+    
+    Authorizations exceedingAuths = new Authorizations("A", "B", "D");
+    ConditionalWriter cw2 = conn.createConditionalWriter(table, new ConditionalWriterConfig().setAuthorizations(exceedingAuths));
+    
+    ConditionalMutation cm8 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb), new Condition("tx", "seq").setVisibility(cva)
+        .setValue("1"));
+    cm8.put("name", "last", cva, "doe");
+    cm8.put("name", "first", cva, "john");
+    cm8.put("tx", "seq", cva, "1");
+
+    try {
+      cw2.write(cm8).getStatus();
+      Assert.assertTrue(false);
+    } catch (AccumuloSecurityException ase) {}
+    
+
+    cw2.close();
   }
   
   @Test
@@ -356,7 +377,7 @@ public class ConditionalWriterTest {
     
     Scanner scanner = conn.createScanner(table + "_clone", new Authorizations());
 
-    ConditionalWriter cw = conn.createConditionalWriter(table + "_clone", new Authorizations());
+    ConditionalWriter cw = conn.createConditionalWriter(table + "_clone", new ConditionalWriterConfig());
 
     ConditionalMutation cm0 = new ConditionalMutation("99006+", new Condition("tx", "seq"));
     cm0.put("tx", "seq", "1");
@@ -421,7 +442,7 @@ public class ConditionalWriterTest {
     
     Assert.assertEquals("3", scanner.iterator().next().getValue().toString());
 
-    ConditionalWriter cw = conn.createConditionalWriter(table, new Authorizations());
+    ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig());
     
     ConditionalMutation cm0 = new ConditionalMutation("ACCUMULO-1000", new Condition("count", "comments").setValue("3"));
     cm0.put("count", "comments", "1");
@@ -504,7 +525,7 @@ public class ConditionalWriterTest {
     cm2.put("tx", "seq", cvab, "1");
     mutations.add(cm2);
     
-    ConditionalWriter cw = conn.createConditionalWriter(table, new Authorizations("A"));
+    ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig().setAuthorizations(new Authorizations("A")));
     Iterator<Result> results = cw.write(mutations.iterator());
     int count = 0;
     while (results.hasNext()) {
@@ -611,7 +632,7 @@ public class ConditionalWriterTest {
       cml.add(cm);
     }
 
-    ConditionalWriter cw = conn.createConditionalWriter(table, Authorizations.EMPTY);
+    ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig());
 
     Iterator<Result> results = cw.write(cml.iterator());
 
@@ -704,7 +725,7 @@ public class ConditionalWriterTest {
     cm3.put("tx", "seq", cvaob, "2");
     mutations.add(cm3);
 
-    ConditionalWriter cw = conn.createConditionalWriter(table, new Authorizations("A"));
+    ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig().setAuthorizations(new Authorizations("A")));
     Iterator<Result> results = cw.write(mutations.iterator());
     HashSet<String> rows = new HashSet<String>();
     while (results.hasNext()) {
@@ -745,7 +766,7 @@ public class ConditionalWriterTest {
     
     conn.tableOperations().create(table);
     
-    ConditionalWriter cw = conn.createConditionalWriter(table, Authorizations.EMPTY);
+    ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig());
     
     ConditionalMutation cm1 = new ConditionalMutation("r1", new Condition("tx", "seq"));
     cm1.put("tx", "seq", "1");
@@ -942,7 +963,7 @@ public class ConditionalWriterTest {
         break;
     }
     
-    ConditionalWriter cw = conn.createConditionalWriter(table, Authorizations.EMPTY);
+    ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig());
     
     ArrayList<ByteSequence> rows = new ArrayList<ByteSequence>();
 
@@ -1026,9 +1047,9 @@ public class ConditionalWriterTest {
     cm1.put("tx", "seq", "1");
     cm1.put("data", "x", "a");
     
-    ConditionalWriter cw1 = conn2.createConditionalWriter("sect1", Authorizations.EMPTY);
-    ConditionalWriter cw2 = conn2.createConditionalWriter("sect2", Authorizations.EMPTY);
-    ConditionalWriter cw3 = conn2.createConditionalWriter("sect3", Authorizations.EMPTY);
+    ConditionalWriter cw1 = conn2.createConditionalWriter("sect1", new ConditionalWriterConfig());
+    ConditionalWriter cw2 = conn2.createConditionalWriter("sect2", new ConditionalWriterConfig());
+    ConditionalWriter cw3 = conn2.createConditionalWriter("sect3", new ConditionalWriterConfig());
     
     Assert.assertEquals(Status.ACCEPTED, cw3.write(cm1).getStatus());
     
@@ -1050,8 +1071,54 @@ public class ConditionalWriterTest {
 
 
   @Test
-  public void testTimeout() {
-    // TODO
+  public void testTimeout() throws Exception {
+    ZooKeeperInstance zki = new ZooKeeperInstance(cluster.getInstanceName(), cluster.getZooKeepers());
+    Connector conn = zki.getConnector("root", new PasswordToken(secret));
+    
+    String table = "fooT";
+    
+    conn.tableOperations().create(table);
+
+    ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig().setTimeout(1, TimeUnit.SECONDS));
+
+    ConditionalMutation cm1 = new ConditionalMutation("r1", new Condition("tx", "seq"));
+    cm1.put("tx", "seq", "1");
+    cm1.put("data", "x", "a");
+    
+    Assert.assertEquals(cw.write(cm1).getStatus(), Status.ACCEPTED);
+    
+    IteratorSetting is = new IteratorSetting(5, SlowIterator.class);
+    SlowIterator.setSeekSleepTime(is, 4000);
+    
+    ConditionalMutation cm2 = new ConditionalMutation("r1", new Condition("tx", "seq").setValue("1").setIterators(is));
+    cm2.put("tx", "seq", "2");
+    cm2.put("data", "x", "b");
+    
+    Assert.assertEquals(cw.write(cm2).getStatus(), Status.UNKNOWN);
+    
+    Scanner scanner = conn.createScanner(table, Authorizations.EMPTY);
+    
+    for (Entry<Key,Value> entry : scanner) {
+      String cf = entry.getKey().getColumnFamilyData().toString();
+      String cq = entry.getKey().getColumnQualifierData().toString();
+      String val = entry.getValue().toString();
+      
+      if (cf.equals("tx") && cq.equals("seq"))
+        Assert.assertEquals("1", val);
+      else if (cf.equals("data") && cq.equals("x"))
+        Assert.assertEquals("a", val);
+      else
+        Assert.assertTrue(false);
+    }
+    
+    ConditionalMutation cm3 = new ConditionalMutation("r1", new Condition("tx", "seq").setValue("1"));
+    cm3.put("tx", "seq", "2");
+    cm3.put("data", "x", "b");
+    
+    Assert.assertEquals(cw.write(cm3).getStatus(), Status.ACCEPTED);
+    
+    cw.close();
+
   }
 
   @Test
@@ -1062,13 +1129,13 @@ public class ConditionalWriterTest {
     Connector conn = zki.getConnector("root", new PasswordToken(secret));
     
     try {
-      conn.createConditionalWriter(table, Authorizations.EMPTY);
+      conn.createConditionalWriter(table, new ConditionalWriterConfig());
       Assert.assertFalse(true);
     } catch (TableNotFoundException e) {}
     
     conn.tableOperations().create(table);
     
-    ConditionalWriter cw = conn.createConditionalWriter(table, Authorizations.EMPTY);
+    ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig());
     
     conn.tableOperations().delete(table);
     
@@ -1081,8 +1148,8 @@ public class ConditionalWriterTest {
     try {
       result.getStatus();
       Assert.assertFalse(true);
-    } catch (TableDeletedException ae) {
-      
+    } catch (AccumuloException ae) {
+      Assert.assertEquals(TableDeletedException.class, ae.getCause().getClass());
     }
     
   }
@@ -1096,7 +1163,7 @@ public class ConditionalWriterTest {
     
     conn.tableOperations().create(table);
     
-    ConditionalWriter cw = conn.createConditionalWriter(table, Authorizations.EMPTY);
+    ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig());
     
     conn.tableOperations().offline(table);
 
@@ -1111,14 +1178,14 @@ public class ConditionalWriterTest {
     try {
       result.getStatus();
       Assert.assertFalse(true);
-    } catch (TableOfflineException ae) {
-      
+    } catch (AccumuloException ae) {
+      Assert.assertEquals(TableOfflineException.class, ae.getCause().getClass());
     }
     
     cw.close();
     
     try {
-      conn.createConditionalWriter(table, Authorizations.EMPTY);
+      conn.createConditionalWriter(table, new ConditionalWriterConfig());
       Assert.assertFalse(true);
     } catch (TableOfflineException e) {}
   }
@@ -1140,7 +1207,7 @@ public class ConditionalWriterTest {
     
     conn.tableOperations().create(table);
     
-    ConditionalWriter cw = conn.createConditionalWriter(table, Authorizations.EMPTY);
+    ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig());
     
     IteratorSetting iterSetting = new IteratorSetting(5, BadIterator.class);