You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ch...@apache.org on 2016/10/13 08:13:05 UTC

hbase git commit: HBASE-16664 Timeout logic in AsyncProcess is broken

Repository: hbase
Updated Branches:
  refs/heads/master f11aa4542 -> 88ff71b91


HBASE-16664 Timeout logic in AsyncProcess is broken

Signed-off-by: chenheng <ch...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/88ff71b9
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/88ff71b9
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/88ff71b9

Branch: refs/heads/master
Commit: 88ff71b91b086984fdc5b8707d134a1d475e5103
Parents: f11aa45
Author: Phil Yang <ud...@gmail.com>
Authored: Sun Oct 9 15:25:11 2016 +0800
Committer: chenheng <ch...@apache.org>
Committed: Thu Oct 13 16:15:43 2016 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/client/AsyncProcess.java       |  38 +++--
 .../hbase/client/AsyncRequestFutureImpl.java    |  64 +++----
 .../hadoop/hbase/client/BufferedMutator.java    |  10 ++
 .../hbase/client/BufferedMutatorImpl.java       |  20 ++-
 .../client/CancellableRegionServerCallable.java |  22 ++-
 .../hbase/client/ConnectionImplementation.java  |   8 +-
 .../org/apache/hadoop/hbase/client/HTable.java  |  48 +++---
 .../hadoop/hbase/client/HTableMultiplexer.java  |   6 +-
 .../hbase/client/MultiServerCallable.java       |  15 +-
 .../client/NoncedRegionServerCallable.java      |   2 +-
 .../hbase/client/RetryingTimeTracker.java       |   3 +-
 .../RpcRetryingCallerWithReadReplicas.java      |  14 +-
 .../hadoop/hbase/client/TestAsyncProcess.java   |  39 +++--
 .../hbase/client/HConnectionTestingUtility.java |   4 +-
 .../org/apache/hadoop/hbase/client/TestHCM.java | 167 +++++++++++++++++--
 15 files changed, 338 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/88ff71b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index f2d9546..abefc46 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -19,6 +19,8 @@
 
 package org.apache.hadoop.hbase.client;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayList;
@@ -55,8 +57,6 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdge;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
-import com.google.common.annotations.VisibleForTesting;
-
 /**
  * This class  allows a continuous flow of requests. It's written to be compatible with a
  * synchronous caller such as HTable.
@@ -212,7 +212,8 @@ class AsyncProcess {
   protected final long pause;
   protected int numTries;
   protected int serverTrackerTimeout;
-  protected int timeout;
+  protected int rpcTimeout;
+  protected int operationTimeout;
   protected long primaryCallTimeoutMicroseconds;
   /** Whether to log details for batch errors */
   protected final boolean logBatchErrorDetails;
@@ -220,7 +221,7 @@ class AsyncProcess {
 
   public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool,
       RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors,
-      RpcControllerFactory rpcFactory, int rpcTimeout) {
+      RpcControllerFactory rpcFactory, int rpcTimeout, int operationTimeout) {
     if (hc == null) {
       throw new IllegalArgumentException("ClusterConnection cannot be null.");
     }
@@ -236,7 +237,8 @@ class AsyncProcess {
     // how many times we could try in total, one more than retry number
     this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1;
-    this.timeout = rpcTimeout;
+    this.rpcTimeout = rpcTimeout;
+    this.operationTimeout = operationTimeout;
     this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000);
 
     this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
@@ -434,7 +436,7 @@ class AsyncProcess {
       List<Integer> locationErrorRows, Map<ServerName, MultiAction<Row>> actionsByServer,
       ExecutorService pool) {
     AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
-      tableName, retainedActions, nonceGroup, pool, callback, results, needResults, null, timeout);
+      tableName, retainedActions, nonceGroup, pool, callback, results, needResults, null, -1);
     // Add location errors if any
     if (locationErrors != null) {
       for (int i = 0; i < locationErrors.size(); ++i) {
@@ -448,6 +450,14 @@ class AsyncProcess {
     return ars;
   }
 
+  public void setRpcTimeout(int rpcTimeout) {
+    this.rpcTimeout = rpcTimeout;
+  }
+
+  public void setOperationTimeout(int operationTimeout) {
+    this.operationTimeout = operationTimeout;
+  }
+
   /**
    * Helper that is used when grouping the actions per region server.
    *
@@ -473,7 +483,7 @@ class AsyncProcess {
 
   public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
       List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) {
-    return submitAll(pool, tableName, rows, callback, results, null, timeout);
+    return submitAll(pool, tableName, rows, callback, results, null, -1);
   }
   /**
    * Submit immediately the list of rows, whatever the server status. Kept for backward
@@ -484,10 +494,11 @@ class AsyncProcess {
    * @param rows the list of rows.
    * @param callback the callback.
    * @param results Optional array to return the results thru; backward compat.
+   * @param rpcTimeout rpc timeout for this batch, set -1 if want to use current setting.
    */
   public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
       List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results,
-      CancellableRegionServerCallable callable, int curTimeout) {
+      CancellableRegionServerCallable callable, int rpcTimeout) {
     List<Action<Row>> actions = new ArrayList<Action<Row>>(rows.size());
 
     // The position will be used by the processBatch to match the object array returned.
@@ -507,7 +518,7 @@ class AsyncProcess {
     }
     AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
         tableName, actions, ng.getNonceGroup(), getPool(pool), callback, results, results != null,
-        callable, curTimeout);
+        callable, rpcTimeout);
     ars.groupAndSendMultiAction(actions, 1);
     return ars;
   }
@@ -520,10 +531,11 @@ class AsyncProcess {
   protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(
       TableName tableName, List<Action<Row>> actions, long nonceGroup, ExecutorService pool,
       Batch.Callback<CResult> callback, Object[] results, boolean needResults,
-      CancellableRegionServerCallable callable, int curTimeout) {
+      CancellableRegionServerCallable callable, int rpcTimeout) {
     return new AsyncRequestFutureImpl<CResult>(
         tableName, actions, nonceGroup, getPool(pool), needResults,
-        results, callback, callable, curTimeout, this);
+        results, callback, callable, operationTimeout,
+        rpcTimeout > 0 ? rpcTimeout : this.rpcTimeout, this);
   }
 
   /** Wait until the async does not have more than max tasks in progress. */
@@ -664,8 +676,8 @@ class AsyncProcess {
    */
   @VisibleForTesting
   protected RpcRetryingCaller<AbstractResponse> createCaller(
-      CancellableRegionServerCallable callable) {
-    return rpcCallerFactory.<AbstractResponse> newCaller();
+      CancellableRegionServerCallable callable, int rpcTimeout) {
+    return rpcCallerFactory.<AbstractResponse> newCaller(rpcTimeout);
   }
 
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/88ff71b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
index 3894d58..a2642f3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
@@ -20,6 +20,24 @@
 package org.apache.hadoop.hbase.client;
 
 import com.google.common.annotations.VisibleForTesting;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
@@ -39,23 +57,6 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.htrace.Trace;
 
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
 /**
  * The context, and return value, for a single submit/submitAll call.
  * Note on how this class (one AP submit) works. Initially, all requests are split into groups
@@ -70,6 +71,8 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
 
   private static final Log LOG = LogFactory.getLog(AsyncRequestFutureImpl.class);
 
+  private RetryingTimeTracker tracker;
+
   /**
    * Runnable (that can be submitted to thread pool) that waits for when it's time
    * to issue replica calls, finds region replicas, groups the requests by replica and
@@ -219,12 +222,12 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
         if (callable == null) {
           callable = createCallable(server, tableName, multiAction);
         }
-        RpcRetryingCaller<AbstractResponse> caller = asyncProcess.createCaller(callable);
+        RpcRetryingCaller<AbstractResponse> caller = asyncProcess.createCaller(callable,rpcTimeout);
         try {
           if (callsInProgress != null) {
             callsInProgress.add(callable);
           }
-          res = caller.callWithoutRetries(callable, currentCallTotalTimeout);
+          res = caller.callWithoutRetries(callable, operationTimeout);
           if (res == null) {
             // Cancelled
             return;
@@ -297,7 +300,8 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
   private final boolean hasAnyReplicaGets;
   private final long nonceGroup;
   private CancellableRegionServerCallable currentCallable;
-  private int currentCallTotalTimeout;
+  private int operationTimeout;
+  private int rpcTimeout;
   private final Map<ServerName, List<Long>> heapSizesByServer = new HashMap<>();
   protected AsyncProcess asyncProcess;
 
@@ -337,10 +341,9 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
 
 
   public AsyncRequestFutureImpl(TableName tableName, List<Action<Row>> actions, long nonceGroup,
-                                ExecutorService pool, boolean needResults, Object[] results,
-                                Batch.Callback<CResult> callback,
-                                CancellableRegionServerCallable callable, int timeout,
-                                AsyncProcess asyncProcess) {
+      ExecutorService pool, boolean needResults, Object[] results, Batch.Callback<CResult> callback,
+      CancellableRegionServerCallable callable, int operationTimeout, int rpcTimeout,
+      AsyncProcess asyncProcess) {
     this.pool = pool;
     this.callback = callback;
     this.nonceGroup = nonceGroup;
@@ -410,9 +413,12 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
     this.errorsByServer = createServerErrorTracker();
     this.errors = (asyncProcess.globalErrors != null)
         ? asyncProcess.globalErrors : new BatchErrors();
+    this.operationTimeout = operationTimeout;
+    this.rpcTimeout = rpcTimeout;
     this.currentCallable = callable;
-    this.currentCallTotalTimeout = timeout;
-
+    if (callable == null) {
+      tracker = new RetryingTimeTracker().start();
+    }
   }
 
   @VisibleForTesting
@@ -1281,9 +1287,9 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
   /**
    * Create a callable. Isolated to be easily overridden in the tests.
    */
-  private MultiServerCallable<Row> createCallable(final ServerName server,
-                                                    TableName tableName, final MultiAction<Row> multi) {
+  private MultiServerCallable<Row> createCallable(final ServerName server, TableName tableName,
+      final MultiAction<Row> multi) {
     return new MultiServerCallable<Row>(asyncProcess.connection, tableName, server,
-        multi, asyncProcess.rpcFactory.newController());
+        multi, asyncProcess.rpcFactory.newController(), rpcTimeout, tracker);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/88ff71b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java
index 5dc7fc3..fcc9af7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java
@@ -120,6 +120,16 @@ public interface BufferedMutator extends Closeable {
   long getWriteBufferSize();
 
   /**
+   * Set rpc timeout for this mutator instance
+   */
+  void setRpcTimeout(int timeout);
+
+  /**
+   * Set operation timeout for this mutator instance
+   */
+  void setOperationTimeout(int timeout);
+
+  /**
    * Listens for asynchronous exceptions on a {@link BufferedMutator}.
    */
   @InterfaceAudience.Public

http://git-wip-us.apache.org/repos/asf/hbase/blob/88ff71b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
index 2d4c8b3..f7eb09d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
@@ -82,6 +82,7 @@ public class BufferedMutatorImpl implements BufferedMutator {
   private boolean closed = false;
   private final ExecutorService pool;
   private int writeRpcTimeout; // needed to pass in through AsyncProcess constructor
+  private int operationTimeout;
 
   @VisibleForTesting
   protected AsyncProcess ap; // non-final so can be overridden in test
@@ -107,9 +108,12 @@ public class BufferedMutatorImpl implements BufferedMutator {
     this.writeRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
         conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
             HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
-
+    this.operationTimeout = conn.getConfiguration().getInt(
+        HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
+        HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
     // puts need to track errors globally due to how the APIs currently work.
-    ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory, writeRpcTimeout);
+    ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory,
+        writeRpcTimeout, operationTimeout);
   }
 
   @Override
@@ -282,6 +286,18 @@ public class BufferedMutatorImpl implements BufferedMutator {
     return this.writeBufferSize;
   }
 
+  @Override
+  public void setRpcTimeout(int timeout) {
+    this.writeRpcTimeout = timeout;
+    ap.setRpcTimeout(timeout);
+  }
+
+  @Override
+  public void setOperationTimeout(int timeout) {
+    this.operationTimeout = timeout;
+    ap.setOperationTimeout(operationTimeout);
+  }
+
   private class QueueRowAccess implements RowAccess<Row> {
     private int remainder = undealtMutationCount.getAndSet(0);
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/88ff71b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java
index 69f5b55..a0ff900 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java
@@ -30,15 +30,20 @@ import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
  * This class is used to unify HTable calls with AsyncProcess Framework. HTable can use
  * AsyncProcess directly though this class. Also adds global timeout tracking on top of
  * RegionServerCallable and implements Cancellable.
+ * Global timeout tracking conflicts with logic in RpcRetryingCallerImpl's callWithRetries. So you
+ * can only use this callable in AsyncProcess which only uses callWithoutRetries and retries in its
+ * own implementation.
  */
 @InterfaceAudience.Private
 abstract class CancellableRegionServerCallable<T> extends ClientServiceCallable<T> implements
     Cancellable {
-  private final RetryingTimeTracker tracker = new RetryingTimeTracker();
-
+  private final RetryingTimeTracker tracker;
+  private final int rpcTimeout;
   CancellableRegionServerCallable(Connection connection, TableName tableName, byte[] row,
-      RpcController rpcController) {
+      RpcController rpcController, int rpcTimeout, RetryingTimeTracker tracker) {
     super(connection, tableName, row, rpcController);
+    this.rpcTimeout = rpcTimeout;
+    this.tracker = tracker;
   }
 
   /* Override so can mess with the callTimeout.
@@ -46,7 +51,7 @@ abstract class CancellableRegionServerCallable<T> extends ClientServiceCallable<
    * @see org.apache.hadoop.hbase.client.RegionServerCallable#rpcCall(int)
    */
   @Override
-  public T call(int callTimeout) throws IOException {
+  public T call(int operationTimeout) throws IOException {
     if (isCancelled()) return null;
     if (Thread.interrupted()) {
       throw new InterruptedIOException();
@@ -54,11 +59,12 @@ abstract class CancellableRegionServerCallable<T> extends ClientServiceCallable<
     // It is expected (it seems) that tracker.start can be called multiple times (on each trip
     // through the call when retrying). Also, we can call start and no need of a stop.
     this.tracker.start();
-    int remainingTime = tracker.getRemainingTime(callTimeout);
-    if (remainingTime == 0) {
-      throw new DoNotRetryIOException("Timeout for mutate row");
+    int remainingTime = tracker.getRemainingTime(operationTimeout);
+    if (remainingTime <= 1) {
+      // "1" is a special return value in RetryingTimeTracker, see its implementation.
+      throw new DoNotRetryIOException("Operation rpcTimeout");
     }
-    return super.call(remainingTime);
+    return super.call(Math.min(rpcTimeout, remainingTime));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/88ff71b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index 8db9dbf..9cf63dc 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -1831,8 +1831,12 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
   // For tests to override.
   protected AsyncProcess createAsyncProcess(Configuration conf) {
     // No default pool available.
-    int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
-    return new AsyncProcess(this, conf, batchPool, rpcCallerFactory, false, rpcControllerFactory, rpcTimeout);
+    int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
+        HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+    int operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
+        HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
+    return new AsyncProcess(this, conf, batchPool, rpcCallerFactory, false, rpcControllerFactory,
+        rpcTimeout, operationTimeout);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/88ff71b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index 84f8024..2802a2c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -441,7 +441,7 @@ public class HTable implements Table {
     RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas(
         rpcControllerFactory, tableName, this.connection, get, pool,
         connConfiguration.getRetriesNumber(),
-        operationTimeout,
+        operationTimeout, readRpcTimeout,
         connConfiguration.getPrimaryCallTimeoutMicroSecond());
     return callable.call(operationTimeout);
   }
@@ -479,15 +479,10 @@ public class HTable implements Table {
     batch(actions, results, -1);
   }
 
-  public void batch(final List<? extends Row> actions, final Object[] results, int timeout)
+  public void batch(final List<? extends Row> actions, final Object[] results, int rpcTimeout)
       throws InterruptedException, IOException {
-    AsyncRequestFuture ars = null;
-    if (timeout != -1) {
-      ars = multiAp.submitAll(pool, tableName, actions, null, results, null, timeout);
-    } else {
-      // use default timeout in AP
-      ars = multiAp.submitAll(pool, tableName, actions, null, results);
-    }
+    AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions, null, results, null,
+        rpcTimeout);
     ars.waitUntilDone();
     if (ars.hasError()) {
       throw ars.getErrors();
@@ -523,7 +518,8 @@ public class HTable implements Table {
   throws IOException {
     CancellableRegionServerCallable<SingleResponse> callable =
         new CancellableRegionServerCallable<SingleResponse>(
-            connection, getName(), delete.getRow(), this.rpcControllerFactory.newController()) {
+            connection, getName(), delete.getRow(), this.rpcControllerFactory.newController(),
+            writeRpcTimeout, new RetryingTimeTracker().start()) {
       @Override
       protected SingleResponse rpcCall() throws Exception {
         MutateRequest request = RequestConverter.buildMutateRequest(
@@ -535,7 +531,7 @@ public class HTable implements Table {
     List<Row> rows = new ArrayList<Row>();
     rows.add(delete);
     AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rows,
-        null, null, callable, operationTimeout);
+        null, null, callable, writeRpcTimeout);
     ars.waitUntilDone();
     if (ars.hasError()) {
       throw ars.getErrors();
@@ -593,7 +589,7 @@ public class HTable implements Table {
   public void mutateRow(final RowMutations rm) throws IOException {
     CancellableRegionServerCallable<MultiResponse> callable =
       new CancellableRegionServerCallable<MultiResponse>(this.connection, getName(), rm.getRow(),
-          rpcControllerFactory.newController()) {
+          rpcControllerFactory.newController(), writeRpcTimeout, new RetryingTimeTracker().start()){
       @Override
       protected MultiResponse rpcCall() throws Exception {
         RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
@@ -614,7 +610,7 @@ public class HTable implements Table {
       }
     };
     AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(),
-        null, null, callable, operationTimeout);
+        null, null, callable, writeRpcTimeout);
     ars.waitUntilDone();
     if (ars.hasError()) {
       throw ars.getErrors();
@@ -798,7 +794,8 @@ public class HTable implements Table {
   throws IOException {
     CancellableRegionServerCallable<SingleResponse> callable =
         new CancellableRegionServerCallable<SingleResponse>(
-            this.connection, getName(), row, this.rpcControllerFactory.newController()) {
+            this.connection, getName(), row, this.rpcControllerFactory.newController(),
+            writeRpcTimeout, new RetryingTimeTracker().start()) {
       @Override
       protected SingleResponse rpcCall() throws Exception {
         CompareType compareType = CompareType.valueOf(compareOp.name());
@@ -814,7 +811,7 @@ public class HTable implements Table {
 
     Object[] results = new Object[1];
     AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rows,
-        null, results, callable, operationTimeout);
+        null, results, callable, -1);
     ars.waitUntilDone();
     if (ars.hasError()) {
       throw ars.getErrors();
@@ -831,7 +828,7 @@ public class HTable implements Table {
     throws IOException {
     CancellableRegionServerCallable<MultiResponse> callable =
       new CancellableRegionServerCallable<MultiResponse>(connection, getName(), rm.getRow(),
-        rpcControllerFactory.newController()) {
+        rpcControllerFactory.newController(), writeRpcTimeout, new RetryingTimeTracker().start()) {
         @Override
         protected MultiResponse rpcCall() throws Exception {
           CompareType compareType = CompareType.valueOf(compareOp.name());
@@ -858,7 +855,7 @@ public class HTable implements Table {
      * */
     Object[] results = new Object[rm.getMutations().size()];
     AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(),
-      null, results, callable, operationTimeout);
+      null, results, callable, -1);
     ars.waitUntilDone();
     if (ars.hasError()) {
       throw ars.getErrors();
@@ -1117,6 +1114,10 @@ public class HTable implements Table {
   @Override
   public void setOperationTimeout(int operationTimeout) {
     this.operationTimeout = operationTimeout;
+    if (mutator != null) {
+      mutator.setOperationTimeout(operationTimeout);
+    }
+    multiAp.setOperationTimeout(operationTimeout);
   }
 
   @Override
@@ -1133,8 +1134,8 @@ public class HTable implements Table {
   @Override
   @Deprecated
   public void setRpcTimeout(int rpcTimeout) {
-    this.readRpcTimeout = rpcTimeout;
-    this.writeRpcTimeout = rpcTimeout;
+    setReadRpcTimeout(rpcTimeout);
+    setWriteRpcTimeout(rpcTimeout);
   }
 
   @Override
@@ -1145,6 +1146,10 @@ public class HTable implements Table {
   @Override
   public void setWriteRpcTimeout(int writeRpcTimeout) {
     this.writeRpcTimeout = writeRpcTimeout;
+    if (mutator != null) {
+      mutator.setRpcTimeout(writeRpcTimeout);
+    }
+    multiAp.setRpcTimeout(writeRpcTimeout);
   }
 
   @Override
@@ -1229,7 +1234,8 @@ public class HTable implements Table {
     AsyncProcess asyncProcess =
         new AsyncProcess(connection, configuration, pool,
             RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()),
-            true, RpcControllerFactory.instantiate(configuration), readRpcTimeout);
+            true, RpcControllerFactory.instantiate(configuration), readRpcTimeout,
+            operationTimeout);
 
     AsyncRequestFuture future = asyncProcess.submitAll(null, tableName, execs,
         new Callback<ClientProtos.CoprocessorServiceResult>() {
@@ -1281,6 +1287,8 @@ public class HTable implements Table {
               .maxKeyValueSize(connConfiguration.getMaxKeyValueSize())
       );
     }
+    mutator.setRpcTimeout(writeRpcTimeout);
+    mutator.setOperationTimeout(operationTimeout);
     return mutator;
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/88ff71b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
index 2c1a61e..e8379ef 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
@@ -442,6 +442,7 @@ public class HTableMultiplexer {
     private final int maxRetryInQueue;
     private final AtomicInteger retryInQueue = new AtomicInteger(0);
     private final int writeRpcTimeout; // needed to pass in through AsyncProcess constructor
+    private final int operationTimeout;
 
     public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr,
         HTableMultiplexer htableMultiplexer, int perRegionServerBufferQueueSize,
@@ -454,7 +455,10 @@ public class HTableMultiplexer {
       this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
           conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
               HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
-      this.ap = new AsyncProcess(conn, conf, pool, rpcCallerFactory, false, rpcControllerFactory, writeRpcTimeout);
+      this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
+          HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
+      this.ap = new AsyncProcess(conn, conf, pool, rpcCallerFactory, false, rpcControllerFactory,
+          writeRpcTimeout, operationTimeout);
       this.executor = executor;
       this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/88ff71b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
index 6067ef0..7d50a27 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
@@ -50,12 +50,13 @@ import com.google.common.annotations.VisibleForTesting;
  */
 @InterfaceAudience.Private
 class MultiServerCallable<R> extends CancellableRegionServerCallable<MultiResponse> {
-  private final MultiAction<R> multiAction;
-  private final boolean cellBlock;
+  private MultiAction<R> multiAction;
+  private boolean cellBlock;
 
   MultiServerCallable(final ClusterConnection connection, final TableName tableName,
-      final ServerName location, final MultiAction<R> multi, RpcController rpcController) {
-    super(connection, tableName, null, rpcController);
+      final ServerName location, final MultiAction<R> multi, RpcController rpcController,
+      int rpcTimeout, RetryingTimeTracker tracker) {
+    super(connection, tableName, null, rpcController, rpcTimeout, tracker);
     this.multiAction = multi;
     // RegionServerCallable has HRegionLocation field, but this is a multi-region request.
     // Using region info from parent HRegionLocation would be a mistake for this class; so
@@ -64,6 +65,12 @@ class MultiServerCallable<R> extends CancellableRegionServerCallable<MultiRespon
     this.cellBlock = isCellBlock();
   }
 
+  public void reset(ServerName location, MultiAction<R> multiAction) {
+    this.location = new HRegionLocation(null, location);
+    this.multiAction = multiAction;
+    this.cellBlock = isCellBlock();
+  }
+
   @Override
   protected HRegionLocation getLocation() {
     throw new RuntimeException("Cannot get region location for multi-region request");

http://git-wip-us.apache.org/repos/asf/hbase/blob/88ff71b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java
index aff0205..52ed263 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java
@@ -38,7 +38,7 @@ import org.apache.hadoop.hbase.ipc.HBaseRpcController;
  * @param <T> the class that the ServerCallable handles
  */
 @InterfaceAudience.Private
-public abstract class NoncedRegionServerCallable<T> extends CancellableRegionServerCallable<T> {
+public abstract class NoncedRegionServerCallable<T> extends ClientServiceCallable<T> {
   private final long nonce;
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/88ff71b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
index b9438e6..e804e92 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
@@ -24,10 +24,11 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 class RetryingTimeTracker {
   private long globalStartTime = -1;
 
-  public void start() {
+  public RetryingTimeTracker start() {
     if (this.globalStartTime < 0) {
       this.globalStartTime = EnvironmentEdgeManager.currentTime();
     }
+    return this;
   }
 
   public int getRemainingTime(int callTimeout) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/88ff71b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
index 04553d2..a290c78 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
@@ -58,7 +58,8 @@ public class RpcRetryingCallerWithReadReplicas {
   protected final Get get;
   protected final TableName tableName;
   protected final int timeBeforeReplicas;
-  private final int callTimeout;
+  private final int operationTimeout;
+  private final int rpcTimeout;
   private final int retries;
   private final RpcControllerFactory rpcControllerFactory;
   private final RpcRetryingCallerFactory rpcRetryingCallerFactory;
@@ -66,7 +67,7 @@ public class RpcRetryingCallerWithReadReplicas {
   public RpcRetryingCallerWithReadReplicas(
       RpcControllerFactory rpcControllerFactory, TableName tableName,
       ClusterConnection cConnection, final Get get,
-      ExecutorService pool, int retries, int callTimeout,
+      ExecutorService pool, int retries, int operationTimeout, int rpcTimeout,
       int timeBeforeReplicas) {
     this.rpcControllerFactory = rpcControllerFactory;
     this.tableName = tableName;
@@ -75,7 +76,8 @@ public class RpcRetryingCallerWithReadReplicas {
     this.get = get;
     this.pool = pool;
     this.retries = retries;
-    this.callTimeout = callTimeout;
+    this.operationTimeout = operationTimeout;
+    this.rpcTimeout = rpcTimeout;
     this.timeBeforeReplicas = timeBeforeReplicas;
     this.rpcRetryingCallerFactory = new RpcRetryingCallerFactory(conf);
   }
@@ -91,7 +93,7 @@ public class RpcRetryingCallerWithReadReplicas {
     public ReplicaRegionServerCallable(int id, HRegionLocation location) {
       super(RpcRetryingCallerWithReadReplicas.this.cConnection,
           RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow(),
-          rpcControllerFactory.newController());
+          rpcControllerFactory.newController(), rpcTimeout, new RetryingTimeTracker());
       this.id = id;
       this.location = location;
     }
@@ -133,7 +135,7 @@ public class RpcRetryingCallerWithReadReplicas {
       ClientProtos.GetRequest request = RequestConverter.buildGetRequest(reg, get);
       HBaseRpcController hrc = (HBaseRpcController)getRpcController();
       hrc.reset();
-      hrc.setCallTimeout(callTimeout);
+      hrc.setCallTimeout(rpcTimeout);
       hrc.setPriority(tableName);
       ClientProtos.GetResponse response = getStub().get(hrc, request);
       if (response == null) {
@@ -258,7 +260,7 @@ public class RpcRetryingCallerWithReadReplicas {
     for (int id = min; id <= max; id++) {
       HRegionLocation hrl = rl.getRegionLocation(id);
       ReplicaRegionServerCallable callOnReplica = new ReplicaRegionServerCallable(id, hrl);
-      cs.submit(callOnReplica, callTimeout, id);
+      cs.submit(callOnReplica, operationTimeout, id);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/88ff71b9/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index 0703e51..ed521a3 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -152,7 +152,10 @@ public class TestAsyncProcess {
     final AtomicInteger nbActions = new AtomicInteger();
     public List<AsyncRequestFuture> allReqs = new ArrayList<AsyncRequestFuture>();
     public AtomicInteger callsCt = new AtomicInteger();
-    private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+    private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
+        HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+    private static int operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
+        HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
     private long previousTimeout = -1;
     @Override
     protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture(TableName tableName,
@@ -162,7 +165,7 @@ public class TestAsyncProcess {
       // Test HTable has tableName of null, so pass DUMMY_TABLE
       AsyncRequestFutureImpl<Res> r = new MyAsyncRequestFutureImpl<Res>(
           DUMMY_TABLE, actions, nonceGroup, getPool(pool), needResults,
-          results, callback, callable, curTimeout, this);
+          results, callback, callable, operationTimeout, rpcTimeout, this);
       allReqs.add(r);
       return r;
     }
@@ -174,14 +177,16 @@ public class TestAsyncProcess {
     public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) {
       super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
           new SynchronousQueue<Runnable>(), new CountingThreadFactory(nbThreads)),
-            new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf), rpcTimeout);
+            new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf), rpcTimeout,
+          operationTimeout);
     }
 
     public MyAsyncProcess(
         ClusterConnection hc, Configuration conf, boolean useGlobalErrors) {
       super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
         new SynchronousQueue<Runnable>(), new CountingThreadFactory(new AtomicInteger())),
-          new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf), rpcTimeout);
+          new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf),
+          rpcTimeout, operationTimeout);
     }
 
     public MyAsyncProcess(ClusterConnection hc, Configuration conf, boolean useGlobalErrors,
@@ -193,7 +198,8 @@ public class TestAsyncProcess {
           throw new RejectedExecutionException("test under failure");
         }
       },
-          new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf), rpcTimeout);
+          new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf),
+          rpcTimeout, operationTimeout);
     }
 
     @Override
@@ -213,7 +219,7 @@ public class TestAsyncProcess {
     }
     @Override
     protected RpcRetryingCaller<AbstractResponse> createCaller(
-        CancellableRegionServerCallable callable) {
+        CancellableRegionServerCallable callable, int rpcTimeout) {
       callsCt.incrementAndGet();
       MultiServerCallable callable1 = (MultiServerCallable) callable;
       final MultiResponse mr = createMultiResponse(
@@ -254,12 +260,11 @@ public class TestAsyncProcess {
   static class MyAsyncRequestFutureImpl<Res> extends AsyncRequestFutureImpl<Res> {
 
     public MyAsyncRequestFutureImpl(TableName tableName, List<Action<Row>> actions, long nonceGroup,
-                                  ExecutorService pool, boolean needResults, Object[] results,
-                                  Batch.Callback callback,
-                                  CancellableRegionServerCallable callable, int timeout,
-                                  AsyncProcess asyncProcess) {
+        ExecutorService pool, boolean needResults, Object[] results,
+        Batch.Callback callback, CancellableRegionServerCallable callable, int operationTimeout,
+        int rpcTimeout, AsyncProcess asyncProcess) {
       super(tableName, actions, nonceGroup, pool, needResults,
-          results, callback, callable, timeout, asyncProcess);
+          results, callback, callable, operationTimeout, rpcTimeout, asyncProcess);
     }
 
     @Override
@@ -299,7 +304,7 @@ public class TestAsyncProcess {
 
     @Override
     protected RpcRetryingCaller<AbstractResponse> createCaller(
-      CancellableRegionServerCallable callable) {
+      CancellableRegionServerCallable callable, int rpcTimeout) {
       callsCt.incrementAndGet();
       return new CallerWithFailure(ioe);
     }
@@ -351,7 +356,7 @@ public class TestAsyncProcess {
 
     @Override
     protected RpcRetryingCaller<AbstractResponse> createCaller(
-        CancellableRegionServerCallable payloadCallable) {
+        CancellableRegionServerCallable payloadCallable, int rpcTimeout) {
       MultiServerCallable<Row> callable = (MultiServerCallable) payloadCallable;
       final MultiResponse mr = createMultiResponse(
           callable.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() {
@@ -1638,12 +1643,14 @@ public class TestAsyncProcess {
   }
 
   static class AsyncProcessForThrowableCheck extends AsyncProcess {
-    private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
-
+    private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
+        HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+    private static int operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
+        HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
     public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf,
         ExecutorService pool) {
       super(hc, conf, pool, new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(
-          conf), rpcTimeout);
+          conf), rpcTimeout, operationTimeout);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/88ff71b9/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
index 658fa96..ee89609 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
@@ -128,7 +128,9 @@ public class HConnectionTestingUtility {
     Mockito.when(c.getAsyncProcess()).thenReturn(
       new AsyncProcess(c, conf, null, RpcRetryingCallerFactory.instantiate(conf), false,
           RpcControllerFactory.instantiate(conf), conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
-              HConstants.DEFAULT_HBASE_RPC_TIMEOUT)));
+              HConstants.DEFAULT_HBASE_RPC_TIMEOUT), conf.getInt(
+                  HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
+          HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)));
     Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn(
         RpcRetryingCallerFactory.instantiate(conf,
             RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null));

http://git-wip-us.apache.org/repos/asf/hbase/blob/88ff71b9/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
index f9ebc47..3416e54 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
@@ -135,12 +135,42 @@ public class TestHCM {
 
     @Override
     public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
-              final Get get, final List<Cell> results) throws IOException {
+        final Get get, final List<Cell> results) throws IOException {
+      Threads.sleep(sleepTime.get());
+      if (ct.incrementAndGet() == 1) {
+        throw new IOException("first call I fail");
+      }
+    }
+
+    @Override
+    public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e,
+        final Put put, final WALEdit edit, final Durability durability) throws IOException {
       Threads.sleep(sleepTime.get());
-      if (ct.incrementAndGet() == 1){
+      if (ct.incrementAndGet() == 1) {
         throw new IOException("first call I fail");
       }
     }
+
+    @Override
+    public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> e,
+        final Delete delete,
+        final WALEdit edit, final Durability durability) throws IOException {
+      Threads.sleep(sleepTime.get());
+      if (ct.incrementAndGet() == 1) {
+        throw new IOException("first call I fail");
+      }
+    }
+
+    @Override
+    public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> e,
+        final Increment increment) throws IOException {
+      Threads.sleep(sleepTime.get());
+      if (ct.incrementAndGet() == 1) {
+        throw new IOException("first call I fail");
+      }
+      return super.preIncrement(e, increment);
+    }
+
   }
 
   public static class SleepCoprocessor extends BaseRegionObserver {
@@ -156,16 +186,20 @@ public class TestHCM {
         final Put put, final WALEdit edit, final Durability durability) throws IOException {
       Threads.sleep(SLEEP_TIME);
     }
-  }
 
-  public static class SleepWriteCoprocessor extends BaseRegionObserver {
-    public static final int SLEEP_TIME = 5000;
     @Override
     public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> e,
                                final Increment increment) throws IOException {
       Threads.sleep(SLEEP_TIME);
       return super.preIncrement(e, increment);
     }
+
+    @Override
+    public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> e, final Delete delete,
+        final WALEdit edit, final Durability durability) throws IOException {
+      Threads.sleep(SLEEP_TIME);
+    }
+
   }
 
   public static class SleepLongerAtFirstCoprocessor extends BaseRegionObserver {
@@ -364,11 +398,12 @@ public class TestHCM {
    * timeouted when the server answers.
    */
   @Test
-  public void testOperationTimeout() throws Exception {
-    HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testOperationTimeout");
+  public void testGetOperationTimeout() throws Exception {
+    HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testGetOperationTimeout");
     hdt.addCoprocessor(SleepAndFailFirstTime.class.getName());
-    Table table = TEST_UTIL.createTable(hdt, new byte[][]{FAM_NAM});
+    Table table = TEST_UTIL.createTable(hdt, new byte[][]{FAM_NAM}, TEST_UTIL.getConfiguration());
     table.setRpcTimeout(Integer.MAX_VALUE);
+    SleepAndFailFirstTime.ct.set(0);
     // Check that it works if the timeout is big enough
     table.setOperationTimeout(120 * 1000);
     table.get(new Get(FAM_NAM));
@@ -392,6 +427,64 @@ public class TestHCM {
   }
 
   @Test
+  public void testPutOperationTimeout() throws Exception {
+    HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testPutOperationTimeout");
+    hdt.addCoprocessor(SleepAndFailFirstTime.class.getName());
+    Table table = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM },TEST_UTIL.getConfiguration());
+    table.setRpcTimeout(Integer.MAX_VALUE);
+    SleepAndFailFirstTime.ct.set(0);
+    // Check that it works if the timeout is big enough
+    table.setOperationTimeout(120 * 1000);
+    table.put(new Put(FAM_NAM).addColumn(FAM_NAM, FAM_NAM, FAM_NAM));
+
+    // Resetting and retrying. Will fail this time, not enough time for the second try
+    SleepAndFailFirstTime.ct.set(0);
+    try {
+      table.setOperationTimeout(30 * 1000);
+      table.put(new Put(FAM_NAM).addColumn(FAM_NAM, FAM_NAM, FAM_NAM));
+      Assert.fail("We expect an exception here");
+    } catch (RetriesExhaustedWithDetailsException e) {
+      // The client has a CallTimeout class, but it's not shared.We're not very clean today,
+      //  in the general case you can expect the call to stop, but the exception may vary.
+      // In this test however, we're sure that it will be a socket timeout.
+      LOG.info("We received an exception, as expected ", e);
+    } catch (IOException e) {
+      Assert.fail("Wrong exception:" + e.getMessage());
+    } finally {
+      table.close();
+    }
+  }
+
+  @Test
+  public void testDeleteOperationTimeout() throws Exception {
+    HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testDeleteOperationTimeout");
+    hdt.addCoprocessor(SleepAndFailFirstTime.class.getName());
+    Table table = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM },TEST_UTIL.getConfiguration());
+    table.setRpcTimeout(Integer.MAX_VALUE);
+    SleepAndFailFirstTime.ct.set(0);
+    // Check that it works if the timeout is big enough
+    table.setOperationTimeout(120 * 1000);
+    table.delete(new Delete(FAM_NAM));
+
+    // Resetting and retrying. Will fail this time, not enough time for the second try
+    SleepAndFailFirstTime.ct.set(0);
+    try {
+      table.setOperationTimeout(30 * 1000);
+      table.delete(new Delete(FAM_NAM));
+      Assert.fail("We expect an exception here");
+    } catch (RetriesExhaustedWithDetailsException e) {
+      // The client has a CallTimeout class, but it's not shared.We're not very clean today,
+      //  in the general case you can expect the call to stop, but the exception may vary.
+      // In this test however, we're sure that it will be a socket timeout.
+      LOG.info("We received an exception, as expected ", e);
+    } catch (IOException e) {
+      Assert.fail("Wrong exception:" + e.getMessage());
+    } finally {
+      table.close();
+    }
+  }
+
+  @Test
   public void testRpcTimeout() throws Exception {
     HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testRpcTimeout");
     hdt.addCoprocessor(SleepCoprocessor.class.getName());
@@ -419,14 +512,14 @@ public class TestHCM {
   }
 
   @Test
-  public void testWriteRpcTimeout() throws Exception {
-    HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testWriteRpcTimeout");
-    hdt.addCoprocessor(SleepWriteCoprocessor.class.getName());
+  public void testIncrementRpcTimeout() throws Exception {
+    HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testIncrementRpcTimeout");
+    hdt.addCoprocessor(SleepCoprocessor.class.getName());
     Configuration c = new Configuration(TEST_UTIL.getConfiguration());
 
     try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) {
-      t.setWriteRpcTimeout(SleepWriteCoprocessor.SLEEP_TIME / 2);
-      t.setOperationTimeout(SleepWriteCoprocessor.SLEEP_TIME * 100);
+      t.setWriteRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2);
+      t.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100);
       Increment i = new Increment(FAM_NAM);
       i.addColumn(FAM_NAM, FAM_NAM, 1);
       t.increment(i);
@@ -436,7 +529,7 @@ public class TestHCM {
     }
 
     // Again, with configuration based override
-    c.setInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, SleepWriteCoprocessor.SLEEP_TIME / 2);
+    c.setInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, SleepCoprocessor.SLEEP_TIME / 2);
     try (Connection conn = ConnectionFactory.createConnection(c)) {
       try (Table t = conn.getTable(hdt.getTableName())) {
         Increment i = new Increment(FAM_NAM);
@@ -450,8 +543,46 @@ public class TestHCM {
   }
 
   @Test
-  public void testReadRpcTimeout() throws Exception {
-    HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testReadRpcTimeout");
+  public void testDeleteRpcTimeout() throws Exception {
+    HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testDeleteRpcTimeout");
+    hdt.addCoprocessor(SleepCoprocessor.class.getName());
+    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+
+    try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) {
+      t.setWriteRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2);
+      t.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100);
+      Delete d = new Delete(FAM_NAM);
+      d.addColumn(FAM_NAM, FAM_NAM, 1);
+      t.delete(d);
+      fail("Write should not have succeeded");
+    } catch (RetriesExhaustedException e) {
+      // expected
+    }
+
+  }
+
+  @Test
+  public void testPutRpcTimeout() throws Exception {
+    HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testPutRpcTimeout");
+    hdt.addCoprocessor(SleepCoprocessor.class.getName());
+    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+
+    try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) {
+      t.setWriteRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2);
+      t.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100);
+      Put p = new Put(FAM_NAM);
+      p.addColumn(FAM_NAM, FAM_NAM, FAM_NAM);
+      t.put(p);
+      fail("Write should not have succeeded");
+    } catch (RetriesExhaustedException e) {
+      // expected
+    }
+
+  }
+
+  @Test
+  public void testGetRpcTimeout() throws Exception {
+    HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testGetRpcTimeout");
     hdt.addCoprocessor(SleepCoprocessor.class.getName());
     Configuration c = new Configuration(TEST_UTIL.getConfiguration());
 
@@ -502,6 +633,7 @@ public class TestHCM {
     TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }).close();
 
     Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+    SleepAndFailFirstTime.ct.set(0);
     c.setInt(HConstants.HBASE_CLIENT_PAUSE, 3000);
     c.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 4000);
 
@@ -1013,8 +1145,7 @@ public class TestHCM {
       curServer.getServerName().getPort(),
       conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort());
 
-    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
-        HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
+    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY);
     table.close();
     connection.close();
   }