You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jy...@apache.org on 2016/03/05 20:16:17 UTC

[3/3] hbase git commit: HBASE-14703 HTable.mutateRow does not collect stats (Heng Chen)

HBASE-14703 HTable.mutateRow does not collect stats (Heng Chen)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ef712df9
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ef712df9
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ef712df9

Branch: refs/heads/master
Commit: ef712df944b0745892bc13bcecdfd6e358a71b66
Parents: d083e4f
Author: Jesse Yates <jy...@apache.org>
Authored: Fri Mar 4 19:07:59 2016 -0800
Committer: Jesse Yates <jy...@apache.org>
Committed: Sat Mar 5 11:01:45 2016 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/client/AsyncProcess.java       |  114 +-
 .../org/apache/hadoop/hbase/client/HTable.java  |  151 +-
 .../hadoop/hbase/client/MetricsConnection.java  |   10 +-
 .../hadoop/hbase/client/MultiResponse.java      |   57 +-
 .../hbase/client/MultiServerCallable.java       |   17 +-
 .../client/PayloadCarryingServerCallable.java   |   48 +
 .../hadoop/hbase/client/ResultStatsUtil.java    |   12 +-
 .../hbase/client/RetryingTimeTracker.java       |   57 +
 .../hbase/client/RpcRetryingCallerFactory.java  |    6 -
 .../hbase/client/RpcRetryingCallerImpl.java     |   34 +-
 .../hbase/client/ServerStatisticTracker.java    |    3 +-
 .../hadoop/hbase/client/StatisticTrackable.java |   33 +
 .../client/StatsTrackingRpcRetryingCaller.java  |   77 -
 .../hadoop/hbase/protobuf/ProtobufUtil.java     |    4 +-
 .../hbase/protobuf/ResponseConverter.java       |   28 +-
 .../hadoop/hbase/client/TestAsyncProcess.java   |   18 +-
 .../hbase/protobuf/generated/ClientProtos.java  | 1452 +++++++++++++++++-
 hbase-protocol/src/main/protobuf/Client.proto   |    8 +-
 .../hadoop/hbase/regionserver/HRegion.java      |    4 +-
 .../hbase/regionserver/RSRpcServices.java       |   68 +-
 .../hadoop/hbase/client/TestCheckAndMutate.java |    9 +-
 .../hadoop/hbase/client/TestClientPushback.java |   29 +
 .../hadoop/hbase/client/TestFromClientSide.java |    8 +-
 .../hadoop/hbase/client/TestReplicasClient.java |    5 +-
 24 files changed, 1891 insertions(+), 361 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ef712df9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index 65c15ce..7b0016c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.htrace.Trace;
@@ -428,7 +429,7 @@ class AsyncProcess {
     if (retainedActions.isEmpty()) return NO_REQS_RESULT;
 
     return submitMultiActions(tableName, retainedActions, nonceGroup, callback, null, needResults,
-      locationErrors, locationErrorRows, actionsByServer, pool);
+        locationErrors, locationErrorRows, actionsByServer, pool);
   }
 
   <CResult> AsyncRequestFuture submitMultiActions(TableName tableName,
@@ -444,7 +445,7 @@ class AsyncProcess {
         int originalIndex = locationErrorRows.get(i);
         Row row = retainedActions.get(originalIndex).getAction();
         ars.manageError(originalIndex, row,
-          Retry.NO_LOCATION_PROBLEM, locationErrors.get(i), null);
+            Retry.NO_LOCATION_PROBLEM, locationErrors.get(i), null);
       }
     }
     ars.sendMultiAction(actionsByServer, 1, null, false);
@@ -546,9 +547,13 @@ class AsyncProcess {
    */
   public <CResult> AsyncRequestFuture submitAll(TableName tableName,
       List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) {
-    return submitAll(null, tableName, rows, callback, results);
+    return submitAll(null, tableName, rows, callback, results, null, timeout);
   }
 
+  public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
+      List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) {
+    return submitAll(pool, tableName, rows, callback, results, null, timeout);
+  }
   /**
    * Submit immediately the list of rows, whatever the server status. Kept for backward
    * compatibility: it allows to be used with the batch interface that return an array of objects.
@@ -560,7 +565,8 @@ class AsyncProcess {
    * @param results Optional array to return the results thru; backward compat.
    */
   public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
-      List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) {
+      List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results,
+      PayloadCarryingServerCallable callable, int curTimeout) {
     List<Action<Row>> actions = new ArrayList<Action<Row>>(rows.size());
 
     // The position will be used by the processBatch to match the object array returned.
@@ -579,7 +585,8 @@ class AsyncProcess {
       actions.add(action);
     }
     AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
-        tableName, actions, ng.getNonceGroup(), getPool(pool), callback, results, results != null);
+        tableName, actions, ng.getNonceGroup(), getPool(pool), callback, results, results != null,
+        callable, curTimeout);
     ars.groupAndSendMultiAction(actions, 1);
     return ars;
   }
@@ -711,11 +718,11 @@ class AsyncProcess {
       private final MultiAction<Row> multiAction;
       private final int numAttempt;
       private final ServerName server;
-      private final Set<MultiServerCallable<Row>> callsInProgress;
+      private final Set<PayloadCarryingServerCallable> callsInProgress;
 
       private SingleServerRequestRunnable(
           MultiAction<Row> multiAction, int numAttempt, ServerName server,
-          Set<MultiServerCallable<Row>> callsInProgress) {
+          Set<PayloadCarryingServerCallable> callsInProgress) {
         this.multiAction = multiAction;
         this.numAttempt = numAttempt;
         this.server = server;
@@ -725,19 +732,22 @@ class AsyncProcess {
       @Override
       public void run() {
         MultiResponse res;
-        MultiServerCallable<Row> callable = null;
+        PayloadCarryingServerCallable callable = currentCallable;
         try {
-          callable = createCallable(server, tableName, multiAction);
+          // setup the callable based on the actions, if we don't have one already from the request
+          if (callable == null) {
+            callable = createCallable(server, tableName, multiAction);
+          }
+          RpcRetryingCaller<MultiResponse> caller = createCaller(callable);
           try {
-            RpcRetryingCaller<MultiResponse> caller = createCaller(callable);
-            if (callsInProgress != null) callsInProgress.add(callable);
-            res = caller.callWithoutRetries(callable, timeout);
-
+            if (callsInProgress != null) {
+              callsInProgress.add(callable);
+            }
+            res = caller.callWithoutRetries(callable, currentCallTotalTimeout);
             if (res == null) {
               // Cancelled
               return;
             }
-
           } catch (IOException e) {
             // The service itself failed . It may be an error coming from the communication
             //   layer, but, as well, a functional error raised by the server.
@@ -771,7 +781,7 @@ class AsyncProcess {
     private final BatchErrors errors;
     private final ConnectionImplementation.ServerErrorTracker errorsByServer;
     private final ExecutorService pool;
-    private final Set<MultiServerCallable<Row>> callsInProgress;
+    private final Set<PayloadCarryingServerCallable> callsInProgress;
 
 
     private final TableName tableName;
@@ -798,10 +808,12 @@ class AsyncProcess {
     private final int[] replicaGetIndices;
     private final boolean hasAnyReplicaGets;
     private final long nonceGroup;
+    private PayloadCarryingServerCallable currentCallable;
+    private int currentCallTotalTimeout;
 
     public AsyncRequestFutureImpl(TableName tableName, List<Action<Row>> actions, long nonceGroup,
         ExecutorService pool, boolean needResults, Object[] results,
-        Batch.Callback<CResult> callback) {
+        Batch.Callback<CResult> callback, PayloadCarryingServerCallable callable, int timeout) {
       this.pool = pool;
       this.callback = callback;
       this.nonceGroup = nonceGroup;
@@ -865,13 +877,16 @@ class AsyncProcess {
         this.replicaGetIndices = null;
       }
       this.callsInProgress = !hasAnyReplicaGets ? null :
-          Collections.newSetFromMap(new ConcurrentHashMap<MultiServerCallable<Row>, Boolean>());
+          Collections.newSetFromMap(
+              new ConcurrentHashMap<PayloadCarryingServerCallable, Boolean>());
 
       this.errorsByServer = createServerErrorTracker();
       this.errors = (globalErrors != null) ? globalErrors : new BatchErrors();
+      this.currentCallable = callable;
+      this.currentCallTotalTimeout = timeout;
     }
 
-    public Set<MultiServerCallable<Row>> getCallsInProgress() {
+    public Set<PayloadCarryingServerCallable> getCallsInProgress() {
       return callsInProgress;
     }
 
@@ -1275,11 +1290,15 @@ class AsyncProcess {
       int failureCount = 0;
       boolean canRetry = true;
 
-      // Go by original action.
+      Map<byte[], MultiResponse.RegionResult> results = responses.getResults();
+      updateStats(server, results);
+
       int failed = 0, stopped = 0;
+      // Go by original action.
       for (Map.Entry<byte[], List<Action<Row>>> regionEntry : multiAction.actions.entrySet()) {
         byte[] regionName = regionEntry.getKey();
-        Map<Integer, Object> regionResults = responses.getResults().get(regionName);
+        Map<Integer, Object> regionResults = results.get(regionName) == null
+            ?  null : results.get(regionName).result;
         if (regionResults == null) {
           if (!responses.getExceptions().containsKey(regionName)) {
             LOG.error("Server sent us neither results nor exceptions for "
@@ -1308,7 +1327,7 @@ class AsyncProcess {
             }
             ++failureCount;
             Retry retry = manageError(sentAction.getOriginalIndex(), row,
-                canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, (Throwable)result, server);
+                canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, (Throwable) result, server);
             if (retry == Retry.YES) {
               toReplay.add(sentAction);
             } else if (retry == Retry.NO_OTHER_SUCCEEDED) {
@@ -1317,24 +1336,11 @@ class AsyncProcess {
               ++failed;
             }
           } else {
-
-            if (AsyncProcess.this.connection.getConnectionMetrics() != null) {
-              AsyncProcess.this.connection.getConnectionMetrics().
-                      updateServerStats(server, regionName, result);
-            }
-
-            // update the stats about the region, if its a user table. We don't want to slow down
-            // updates to meta tables, especially from internal updates (master, etc).
-            if (AsyncProcess.this.connection.getStatisticsTracker() != null) {
-              result = ResultStatsUtil.updateStats(result,
-                  AsyncProcess.this.connection.getStatisticsTracker(), server, regionName);
-            }
-
             if (callback != null) {
               try {
                 //noinspection unchecked
                 // TODO: would callback expect a replica region name if it gets one?
-                this.callback.update(regionName, sentAction.getAction().getRow(), (CResult)result);
+                this.callback.update(regionName, sentAction.getAction().getRow(), (CResult) result);
               } catch (Throwable t) {
                 LOG.error("User callback threw an exception for "
                     + Bytes.toStringBinary(regionName) + ", ignoring", t);
@@ -1384,7 +1390,6 @@ class AsyncProcess {
           }
         }
       }
-
       if (toReplay.isEmpty()) {
         logNoResubmit(server, numAttempt, failureCount, throwable, failed, stopped);
       } else {
@@ -1438,8 +1443,8 @@ class AsyncProcess {
       boolean isStale = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
       int index = action.getOriginalIndex();
       if (results == null) {
-         decActionCounter(index);
-         return; // Simple case, no replica requests.
+        decActionCounter(index);
+        return; // Simple case, no replica requests.
       }
       state = trySetResultSimple(index, action.getAction(), false, result, null, isStale);
       if (state == null) {
@@ -1618,7 +1623,7 @@ class AsyncProcess {
         throw new InterruptedIOException(iex.getMessage());
       } finally {
         if (callsInProgress != null) {
-          for (MultiServerCallable<Row> clb : callsInProgress) {
+          for (PayloadCarryingServerCallable clb : callsInProgress) {
             clb.cancel();
           }
         }
@@ -1675,13 +1680,38 @@ class AsyncProcess {
     }
   }
 
+  private void updateStats(ServerName server, Map<byte[], MultiResponse.RegionResult> results) {
+    boolean metrics = AsyncProcess.this.connection.getConnectionMetrics() != null;
+    boolean stats = AsyncProcess.this.connection.getStatisticsTracker() != null;
+    if (!stats && !metrics) {
+      return;
+    }
+    for (Map.Entry<byte[], MultiResponse.RegionResult> regionStats : results.entrySet()) {
+      byte[] regionName = regionStats.getKey();
+      ClientProtos.RegionLoadStats stat = regionStats.getValue().getStat();
+      ResultStatsUtil.updateStats(AsyncProcess.this.connection.getStatisticsTracker(), server,
+          regionName, stat);
+      ResultStatsUtil.updateStats(AsyncProcess.this.connection.getConnectionMetrics(),
+          server, regionName, stat);
+    }
+  }
+
+  protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(
+      TableName tableName, List<Action<Row>> actions, long nonceGroup, ExecutorService pool,
+      Batch.Callback<CResult> callback, Object[] results, boolean needResults,
+      PayloadCarryingServerCallable callable, int curTimeout) {
+    return new AsyncRequestFutureImpl<CResult>(
+        tableName, actions, nonceGroup, getPool(pool), needResults,
+        results, callback, callable, curTimeout);
+  }
+
   @VisibleForTesting
   /** Create AsyncRequestFuture. Isolated to be easily overridden in the tests. */
   protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(
       TableName tableName, List<Action<Row>> actions, long nonceGroup, ExecutorService pool,
       Batch.Callback<CResult> callback, Object[] results, boolean needResults) {
-    return new AsyncRequestFutureImpl<CResult>(
-        tableName, actions, nonceGroup, getPool(pool), needResults, results, callback);
+    return createAsyncRequestFuture(
+        tableName, actions, nonceGroup, pool, callback, results, needResults, null, timeout);
   }
 
   /**
@@ -1697,7 +1727,7 @@ class AsyncProcess {
    * Create a caller. Isolated to be easily overridden in the tests.
    */
   @VisibleForTesting
-  protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
+  protected RpcRetryingCaller<MultiResponse> createCaller(PayloadCarryingServerCallable callable) {
     return rpcCallerFactory.<MultiResponse> newCaller();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/ef712df9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index ce5a44c..33fd94e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -37,6 +37,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HTableDescriptor;
@@ -55,6 +56,7 @@ import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
@@ -263,7 +265,8 @@ public class HTable implements HTableInterface {
    */
   @Override
   public HTableDescriptor getTableDescriptor() throws IOException {
-    HTableDescriptor htd = HBaseAdmin.getTableDescriptor(tableName, connection, rpcCallerFactory, operationTimeout);
+    HTableDescriptor htd = HBaseAdmin.getTableDescriptor(tableName, connection,
+        rpcCallerFactory, operationTimeout);
     if (htd != null) {
       return new UnmodifyableHTableDescriptor(htd);
     }
@@ -450,10 +453,10 @@ public class HTable implements HTableInterface {
 
     // Call that takes into account the replica
     RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas(
-      rpcControllerFactory, tableName, this.connection, get, pool,
-      tableConfiguration.getRetriesNumber(),
-      operationTimeout,
-      tableConfiguration.getPrimaryCallTimeoutMicroSecond());
+        rpcControllerFactory, tableName, this.connection, get, pool,
+        tableConfiguration.getRetriesNumber(),
+        operationTimeout,
+        tableConfiguration.getPrimaryCallTimeoutMicroSecond());
     return callable.call();
   }
 
@@ -587,35 +590,47 @@ public class HTable implements HTableInterface {
    */
   @Override
   public void mutateRow(final RowMutations rm) throws IOException {
-    RegionServerCallable<Void> callable =
-        new RegionServerCallable<Void>(connection, getName(), rm.getRow()) {
-      @Override
-      public Void call(int callTimeout) throws IOException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setPriority(tableName);
-        controller.setCallTimeout(callTimeout);
-        try {
-          RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
-            getLocation().getRegionInfo().getRegionName(), rm);
-          regionMutationBuilder.setAtomic(true);
-          MultiRequest request =
-            MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
-          ClientProtos.MultiResponse response = getStub().multi(controller, request);
-          ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
-          if (res.hasException()) {
-            Throwable ex = ProtobufUtil.toException(res.getException());
-            if(ex instanceof IOException) {
-              throw (IOException)ex;
+    final RetryingTimeTracker tracker = new RetryingTimeTracker();
+    PayloadCarryingServerCallable<MultiResponse> callable =
+      new PayloadCarryingServerCallable<MultiResponse>(connection, getName(), rm.getRow(),
+          rpcControllerFactory) {
+        @Override
+        public MultiResponse call(int callTimeout) throws IOException {
+          tracker.start();
+          controller.setPriority(tableName);
+          int remainingTime = tracker.getRemainingTime(callTimeout);
+          if (remainingTime == 0) {
+            throw new DoNotRetryIOException("Timeout for mutate row");
+          }
+          controller.setCallTimeout(remainingTime);
+          try {
+            RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
+                getLocation().getRegionInfo().getRegionName(), rm);
+            regionMutationBuilder.setAtomic(true);
+            MultiRequest request =
+                MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
+            ClientProtos.MultiResponse response = getStub().multi(controller, request);
+            ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
+            if (res.hasException()) {
+              Throwable ex = ProtobufUtil.toException(res.getException());
+              if (ex instanceof IOException) {
+                throw (IOException) ex;
+              }
+              throw new IOException("Failed to mutate row: " +
+                  Bytes.toStringBinary(rm.getRow()), ex);
             }
-            throw new IOException("Failed to mutate row: "+Bytes.toStringBinary(rm.getRow()), ex);
+            return ResponseConverter.getResults(request, response, controller.cellScanner());
+          } catch (ServiceException se) {
+            throw ProtobufUtil.getRemoteException(se);
           }
-        } catch (ServiceException se) {
-          throw ProtobufUtil.getRemoteException(se);
         }
-        return null;
-      }
-    };
-    rpcCallerFactory.<Void> newCaller().callWithRetries(callable, this.operationTimeout);
+      };
+    AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(),
+        null, null, callable, operationTimeout);
+    ars.waitUntilDone();
+    if (ars.hasError()) {
+      throw ars.getErrors();
+    }
   }
 
   /**
@@ -860,37 +875,55 @@ public class HTable implements HTableInterface {
    */
   @Override
   public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
-      final CompareOp compareOp, final byte [] value, final RowMutations rm)
-  throws IOException {
-    RegionServerCallable<Boolean> callable =
-        new RegionServerCallable<Boolean>(connection, getName(), row) {
-          @Override
-          public Boolean call(int callTimeout) throws IOException {
-            PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-            controller.setPriority(tableName);
-            controller.setCallTimeout(callTimeout);
-            try {
-              CompareType compareType = CompareType.valueOf(compareOp.name());
-              MultiRequest request = RequestConverter.buildMutateRequest(
-                  getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
-                  new BinaryComparator(value), compareType, rm);
-              ClientProtos.MultiResponse response = getStub().multi(controller, request);
-              ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
-              if (res.hasException()) {
-                Throwable ex = ProtobufUtil.toException(res.getException());
-                if(ex instanceof IOException) {
-                  throw (IOException)ex;
-                }
-                throw new IOException("Failed to checkAndMutate row: "+
-                    Bytes.toStringBinary(rm.getRow()), ex);
+    final CompareOp compareOp, final byte [] value, final RowMutations rm)
+    throws IOException {
+    final RetryingTimeTracker tracker = new RetryingTimeTracker();
+    PayloadCarryingServerCallable<MultiResponse> callable =
+      new PayloadCarryingServerCallable<MultiResponse>(connection, getName(), rm.getRow(),
+        rpcControllerFactory) {
+        @Override
+        public MultiResponse call(int callTimeout) throws IOException {
+          tracker.start();
+          controller.setPriority(tableName);
+          int remainingTime = tracker.getRemainingTime(callTimeout);
+          if (remainingTime == 0) {
+            throw new DoNotRetryIOException("Timeout for mutate row");
+          }
+          controller.setCallTimeout(remainingTime);
+          try {
+            CompareType compareType = CompareType.valueOf(compareOp.name());
+            MultiRequest request = RequestConverter.buildMutateRequest(
+              getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
+              new BinaryComparator(value), compareType, rm);
+            ClientProtos.MultiResponse response = getStub().multi(controller, request);
+            ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
+            if (res.hasException()) {
+              Throwable ex = ProtobufUtil.toException(res.getException());
+              if(ex instanceof IOException) {
+                throw (IOException)ex;
               }
-              return Boolean.valueOf(response.getProcessed());
-            } catch (ServiceException se) {
-              throw ProtobufUtil.getRemoteException(se);
+              throw new IOException("Failed to checkAndMutate row: "+
+                                    Bytes.toStringBinary(rm.getRow()), ex);
             }
+            return ResponseConverter.getResults(request, response, controller.cellScanner());
+          } catch (ServiceException se) {
+            throw ProtobufUtil.getRemoteException(se);
           }
-        };
-    return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
+        }
+      };
+    /**
+     *  Currently, we use one array to store 'processed' flag which is returned by server.
+     *  It is excessive to send such a large array, but that is required by the framework right now
+     * */
+    Object[] results = new Object[rm.getMutations().size()];
+    AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(),
+      null, results, callable, operationTimeout);
+    ars.waitUntilDone();
+    if (ars.hasError()) {
+      throw ars.getErrors();
+    }
+
+    return ((Result)results[0]).getExists();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/ef712df9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
index 6a292bc..400f505 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
@@ -52,7 +52,7 @@ import static com.codahale.metrics.MetricRegistry.name;
  * {@link #shutdown()} to terminate the thread pools they allocate.
  */
 @InterfaceAudience.Private
-public class MetricsConnection {
+public class MetricsConnection implements StatisticTrackable {
 
   /** Set this key to {@code true} to enable metrics collection of client requests. */
   public static final String CLIENT_SIDE_METRICS_ENABLED_KEY = "hbase.client.metrics.enable";
@@ -199,9 +199,15 @@ public class MetricsConnection {
     }
     Result result = (Result) r;
     ClientProtos.RegionLoadStats stats = result.getStats();
-    if(stats == null){
+    if (stats == null) {
       return;
     }
+    updateRegionStats(serverName, regionName, stats);
+  }
+
+  @Override
+  public void updateRegionStats(ServerName serverName, byte[] regionName,
+    ClientProtos.RegionLoadStats stats) {
     String name = serverName.getServerName() + "," + Bytes.toStringBinary(regionName);
     ConcurrentMap<byte[], RegionStats> rsStats = null;
     if (serverStats.containsKey(serverName)) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/ef712df9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java
index 089ccff..79a9ed3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.TreeMap;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
@@ -33,8 +34,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 public class MultiResponse {
 
   // map of regionName to map of Results by the original index for that Result
-  private Map<byte[], Map<Integer, Object>> results =
-      new TreeMap<byte[], Map<Integer, Object>>(Bytes.BYTES_COMPARATOR);
+  private Map<byte[], RegionResult> results = new TreeMap<>(Bytes.BYTES_COMPARATOR);
 
   /**
    * The server can send us a failure for the region itself, instead of individual failure.
@@ -52,8 +52,8 @@ public class MultiResponse {
    */
   public int size() {
     int size = 0;
-    for (Map<?,?> c : results.values()) {
-      size += c.size();
+    for (RegionResult result: results.values()) {
+      size += result.size();
     }
     return size;
   }
@@ -66,16 +66,7 @@ public class MultiResponse {
    * @param resOrEx the result or error; will be empty for successful Put and Delete actions.
    */
   public void add(byte[] regionName, int originalIndex, Object resOrEx) {
-    Map<Integer, Object> rs = results.get(regionName);
-    if (rs == null) {
-      rs = new HashMap<Integer, Object>();
-      results.put(regionName, rs);
-    }
-    rs.put(originalIndex, resOrEx);
-  }
-
-  public Map<byte[], Map<Integer, Object>> getResults() {
-    return results;
+    getResult(regionName).addResult(originalIndex, resOrEx);
   }
 
   public void addException(byte []regionName, Throwable ie){
@@ -92,4 +83,42 @@ public class MultiResponse {
   public Map<byte[], Throwable> getExceptions() {
     return exceptions;
   }
+
+  public void addStatistic(byte[] regionName, ClientProtos.RegionLoadStats stat) {
+    getResult(regionName).setStat(stat);
+  }
+
+  private RegionResult getResult(byte[] region){
+    RegionResult rs = results.get(region);
+    if (rs == null) {
+      rs = new RegionResult();
+      results.put(region, rs);
+    }
+    return rs;
+  }
+
+  public Map<byte[], RegionResult> getResults(){
+    return this.results;
+  }
+
+  static class RegionResult{
+    Map<Integer, Object> result = new HashMap<>();
+    ClientProtos.RegionLoadStats stat;
+
+    public void addResult(int index, Object result){
+      this.result.put(index, result);
+    }
+
+    public void setStat(ClientProtos.RegionLoadStats stat){
+      this.stat = stat;
+    }
+
+    public int size() {
+      return this.result.size();
+    }
+
+    public ClientProtos.RegionLoadStats getStat() {
+      return this.stat;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ef712df9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
index 85b401e..f78f348 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
@@ -50,21 +49,19 @@ import com.google.protobuf.ServiceException;
  * {@link RegionServerCallable} that goes against multiple regions.
  * @param <R>
  */
-class MultiServerCallable<R> extends RegionServerCallable<MultiResponse> implements Cancellable {
+class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse> {
   private final MultiAction<R> multiAction;
   private final boolean cellBlock;
-  private final PayloadCarryingRpcController controller;
 
   MultiServerCallable(final ClusterConnection connection, final TableName tableName,
       final ServerName location, RpcControllerFactory rpcFactory, final MultiAction<R> multi) {
-    super(connection, tableName, null);
+    super(connection, tableName, null, rpcFactory);
     this.multiAction = multi;
     // RegionServerCallable has HRegionLocation field, but this is a multi-region request.
     // Using region info from parent HRegionLocation would be a mistake for this class; so
     // we will store the server here, and throw if someone tries to obtain location/regioninfo.
     this.location = new HRegionLocation(null, location);
     this.cellBlock = isCellBlock();
-    controller = rpcFactory.newController();
   }
 
   @Override
@@ -133,16 +130,6 @@ class MultiServerCallable<R> extends RegionServerCallable<MultiResponse> impleme
     return ResponseConverter.getResults(requestProto, responseProto, controller.cellScanner());
   }
 
-  @Override
-  public void cancel() {
-    controller.startCancel();
-  }
-
-  @Override
-  public boolean isCancelled() {
-    return controller.isCanceled();
-  }
-
   /**
    * @return True if we should send data in cellblocks.  This is an expensive call.  Cache the
    * result if you can rather than call each time.

http://git-wip-us.apache.org/repos/asf/hbase/blob/ef712df9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
new file mode 100644
index 0000000..d94f069
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+
+/**
+ * This class is used to unify HTable calls with AsyncProcess Framework.
+ * HTable can use AsyncProcess directly though this class.
+ */
+@InterfaceAudience.Private
+public abstract class PayloadCarryingServerCallable<T>
+    extends RegionServerCallable<T> implements Cancellable {
+  protected PayloadCarryingRpcController controller;
+
+  public PayloadCarryingServerCallable(Connection connection, TableName tableName, byte[] row,
+    RpcControllerFactory rpcControllerFactory) {
+    super(connection, tableName, row);
+    this.controller = rpcControllerFactory.newController();
+  }
+
+  @Override
+  public void cancel() {
+    controller.startCancel();
+  }
+
+  @Override
+  public boolean isCancelled() {
+    return controller.isCanceled();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ef712df9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java
index 3caa63e..6537d79 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java
@@ -55,13 +55,17 @@ public final class ResultStatsUtil {
       return r;
     }
 
-    if (regionName != null) {
-      serverStats.updateRegionStats(server, regionName, stats);
-    }
-
+    updateStats(serverStats, server, regionName, stats);
     return r;
   }
 
+  public static void updateStats(StatisticTrackable tracker, ServerName server, byte[] regionName,
+    ClientProtos.RegionLoadStats stats) {
+    if (regionName != null && stats != null && tracker != null) {
+      tracker.updateRegionStats(server, regionName, stats);
+    }
+  }
+
   public static <T> T updateStats(T r, ServerStatisticTracker stats,
       HRegionLocation regionLocation) {
     byte[] regionName = null;

http://git-wip-us.apache.org/repos/asf/hbase/blob/ef712df9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
new file mode 100644
index 0000000..24288e6
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+/**
+ * Tracks the amount of time remaining for an operation.
+ */
+class RetryingTimeTracker {
+
+  private long globalStartTime = -1;
+
+  public void start() {
+    if (this.globalStartTime < 0) {
+      this.globalStartTime = EnvironmentEdgeManager.currentTime();
+    }
+  }
+
+  public int getRemainingTime(int callTimeout) {
+    if (callTimeout <= 0) {
+      return 0;
+    } else {
+      if (callTimeout == Integer.MAX_VALUE) {
+        return Integer.MAX_VALUE;
+      }
+      int remainingTime = (int) (
+        callTimeout -
+        (EnvironmentEdgeManager.currentTime() - this.globalStartTime));
+      if (remainingTime < 1) {
+        // If there is no time left, we're trying anyway. It's too late.
+        // 0 means no timeout, and it's not the intent here. So we secure both cases by
+        // resetting to the minimum.
+        remainingTime = 1;
+      }
+      return remainingTime;
+    }
+  }
+
+  public long getStartTime() {
+    return this.globalStartTime;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ef712df9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
index 0af8210..550812f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
@@ -67,12 +67,6 @@ public class RpcRetryingCallerFactory {
     //  is cheap as it does not require parsing a complex structure.
     RpcRetryingCaller<T> caller = new RpcRetryingCallerImpl<T>(pause, retries, interceptor,
         startLogErrorsCnt);
-
-    // wrap it with stats, if we are tracking them
-    if (enableBackPressure && this.stats != null) {
-      caller = new StatsTrackingRpcRetryingCaller<T>(caller, this.stats);
-    }
-
     return caller;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/ef712df9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java
index 12abc6a..6ce4956 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java
@@ -51,10 +51,6 @@ import com.google.protobuf.ServiceException;
 public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> {
   // LOG is being used in TestMultiRowRangeFilter, hence leaving it public
   public static final Log LOG = LogFactory.getLog(RpcRetryingCallerImpl.class);
-  /**
-   * When we started making calls.
-   */
-  private long globalStartTime;
 
   /** How many retries are allowed before we start to log */
   private final int startLogErrorsCnt;
@@ -64,6 +60,7 @@ public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> {
   private final AtomicBoolean cancelled = new AtomicBoolean(false);
   private final RetryingCallerInterceptor interceptor;
   private final RetryingCallerInterceptorContext context;
+  private final RetryingTimeTracker tracker;
 
   public RpcRetryingCallerImpl(long pause, int retries, int startLogErrorsCnt) {
     this(pause, retries, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, startLogErrorsCnt);
@@ -76,23 +73,7 @@ public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> {
     this.interceptor = interceptor;
     context = interceptor.createEmptyContext();
     this.startLogErrorsCnt = startLogErrorsCnt;
-  }
-
-  private int getRemainingTime(int callTimeout) {
-    if (callTimeout <= 0) {
-      return 0;
-    } else {
-      if (callTimeout == Integer.MAX_VALUE) return Integer.MAX_VALUE;
-      int remainingTime = (int) (callTimeout -
-          (EnvironmentEdgeManager.currentTime() - this.globalStartTime));
-      if (remainingTime < 1) {
-        // If there is no time left, we're trying anyway. It's too late.
-        // 0 means no timeout, and it's not the intent here. So we secure both cases by
-        // resetting to the minimum.
-        remainingTime = 1;
-      }
-      return remainingTime;
-    }
+    this.tracker = new RetryingTimeTracker();
   }
   
   @Override
@@ -108,21 +89,21 @@ public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> {
   throws IOException, RuntimeException {
     List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions =
       new ArrayList<RetriesExhaustedException.ThrowableWithExtraContext>();
-    this.globalStartTime = EnvironmentEdgeManager.currentTime();
+    tracker.start();
     context.clear();
     for (int tries = 0;; tries++) {
       long expectedSleep;
       try {
         callable.prepare(tries != 0); // if called with false, check table status on ZK
         interceptor.intercept(context.prepare(callable, tries));
-        return callable.call(getRemainingTime(callTimeout));
+        return callable.call(tracker.getRemainingTime(callTimeout));
       } catch (PreemptiveFastFailException e) {
         throw e;
       } catch (Throwable t) {
         ExceptionUtil.rethrowIfInterrupt(t);
         if (tries > startLogErrorsCnt) {
           LOG.info("Call exception, tries=" + tries + ", maxAttempts=" + maxAttempts + ", started="
-              + (EnvironmentEdgeManager.currentTime() - this.globalStartTime) + " ms ago, "
+              + (EnvironmentEdgeManager.currentTime() - tracker.getStartTime()) + " ms ago, "
               + "cancelled=" + cancelled.get() + ", msg="
               + callable.getExceptionMessageAdditionalDetail());
         }
@@ -172,14 +153,13 @@ public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> {
    * @return Calculate how long a single call took
    */
   private long singleCallDuration(final long expectedSleep) {
-    return (EnvironmentEdgeManager.currentTime() - this.globalStartTime) + expectedSleep;
+    return (EnvironmentEdgeManager.currentTime() - tracker.getStartTime()) + expectedSleep;
   }
 
   @Override
   public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout)
   throws IOException, RuntimeException {
     // The code of this method should be shared with withRetries.
-    this.globalStartTime = EnvironmentEdgeManager.currentTime();
     try {
       callable.prepare(false);
       return callable.call(callTimeout);
@@ -231,7 +211,7 @@ public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> {
 
   @Override
   public String toString() {
-    return "RpcRetryingCaller{" + "globalStartTime=" + globalStartTime +
+    return "RpcRetryingCaller{" + "globalStartTime=" + tracker.getStartTime() +
         ", pause=" + pause + ", maxAttempts=" + maxAttempts + '}';
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ef712df9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java
index d03ecf6..b8e7923 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java
@@ -31,11 +31,12 @@ import java.util.concurrent.ConcurrentHashMap;
  * Tracks the statistics for multiple regions
  */
 @InterfaceAudience.Private
-public class ServerStatisticTracker {
+public class ServerStatisticTracker implements StatisticTrackable {
 
   private final ConcurrentHashMap<ServerName, ServerStatistics> stats =
       new ConcurrentHashMap<ServerName, ServerStatistics>();
 
+  @Override
   public void updateRegionStats(ServerName server, byte[] region, ClientProtos.RegionLoadStats
       currentStats) {
     ServerStatistics stat = stats.get(server);

http://git-wip-us.apache.org/repos/asf/hbase/blob/ef712df9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatisticTrackable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatisticTrackable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatisticTrackable.java
new file mode 100644
index 0000000..7bb49e7
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatisticTrackable.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+
+/**
+ * Parent interface for an object to get updates about per-region statistics.
+ */
+@InterfaceAudience.Private
+public interface StatisticTrackable {
+  /**
+   *  Update stats per region.
+   * */
+  void updateRegionStats(ServerName server, byte[] region, ClientProtos.RegionLoadStats
+    stats);
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ef712df9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java
deleted file mode 100644
index e82f1e8..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
-import java.io.IOException;
-
-/**
- * An {@link RpcRetryingCaller} that will update the per-region stats for the call on return,
- * if stats are available
- */
-@InterfaceAudience.Private
-public class StatsTrackingRpcRetryingCaller<T> implements RpcRetryingCaller<T> {
-  private final ServerStatisticTracker stats;
-  private final RpcRetryingCaller<T> delegate;
-
-  public StatsTrackingRpcRetryingCaller(RpcRetryingCaller<T> delegate,
-      ServerStatisticTracker stats) {
-    this.delegate = delegate;
-    this.stats = stats;
-  }
-
-  @Override
-  public void cancel() {
-    delegate.cancel();
-  }
-
-  @Override
-  public T callWithRetries(RetryingCallable<T> callable, int callTimeout)
-      throws IOException, RuntimeException {
-    T result = delegate.callWithRetries(callable, callTimeout);
-    return updateStatsAndUnwrap(result, callable);
-  }
-
-  @Override
-  public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout)
-      throws IOException, RuntimeException {
-    T result = delegate.callWithRetries(callable, callTimeout);
-    return updateStatsAndUnwrap(result, callable);
-  }
-
-  private T updateStatsAndUnwrap(T result, RetryingCallable<T> callable) {
-    // don't track stats about requests that aren't to regionservers
-    if (!(callable instanceof RegionServerCallable)) {
-      return result;
-    }
-
-    // mutli-server callables span multiple regions, so they don't have a location,
-    // but they are region server callables, so we have to handle them when we process the
-    // result in AsyncProcess#receiveMultiAction, not in here
-    if (callable instanceof MultiServerCallable) {
-      return result;
-    }
-
-    // update the stats for the single server callable
-    RegionServerCallable<T> regionCallable = (RegionServerCallable) callable;
-    HRegionLocation location = regionCallable.getLocation();
-    return ResultStatsUtil.updateStats(result, stats, location);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ef712df9/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index 261a9aa..b052e63 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -193,8 +193,8 @@ public final class ProtobufUtil {
    */
   private final static Cell[] EMPTY_CELL_ARRAY = new Cell[]{};
   private final static Result EMPTY_RESULT = Result.create(EMPTY_CELL_ARRAY);
-  private final static Result EMPTY_RESULT_EXISTS_TRUE = Result.create(null, true);
-  private final static Result EMPTY_RESULT_EXISTS_FALSE = Result.create(null, false);
+  final static Result EMPTY_RESULT_EXISTS_TRUE = Result.create(null, true);
+  final static Result EMPTY_RESULT_EXISTS_FALSE = Result.create(null, false);
   private final static Result EMPTY_RESULT_STALE = Result.create(EMPTY_CELL_ARRAY, null, true);
   private final static Result EMPTY_RESULT_EXISTS_TRUE_STALE
     = Result.create((Cell[])null, true, true);

http://git-wip-us.apache.org/repos/asf/hbase/blob/ef712df9/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
index 177b1c7..421907d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
@@ -89,7 +89,7 @@ public final class ResponseConverter {
     int requestRegionActionCount = request.getRegionActionCount();
     int responseRegionActionResultCount = response.getRegionActionResultCount();
     if (requestRegionActionCount != responseRegionActionResultCount) {
-      throw new IllegalStateException("Request mutation count=" + responseRegionActionResultCount +
+      throw new IllegalStateException("Request mutation count=" + requestRegionActionCount +
           " does not match response mutation result count=" + responseRegionActionResultCount);
     }
 
@@ -125,21 +125,27 @@ public final class ResponseConverter {
           responseValue = ProtobufUtil.toException(roe.getException());
         } else if (roe.hasResult()) {
           responseValue = ProtobufUtil.toResult(roe.getResult(), cells);
-          // add the load stats, if we got any
-          if (roe.hasLoadStats()) {
-            ((Result) responseValue).addResults(roe.getLoadStats());
-          }
         } else if (roe.hasServiceResult()) {
           responseValue = roe.getServiceResult();
-        } else {
-          // no result & no exception. Unexpected.
-          throw new IllegalStateException("No result & no exception roe=" + roe +
-              " for region " + actions.getRegion());
+        } else{
+          // Sometimes, the response is just "it was processed". Generally, this occurs for things
+          // like mutateRows where either we get back 'processed' (or not) and optionally some
+          // statistics about the regions we touched.
+          responseValue = response.getProcessed() ?
+                          ProtobufUtil.EMPTY_RESULT_EXISTS_TRUE :
+                          ProtobufUtil.EMPTY_RESULT_EXISTS_FALSE;
         }
         results.add(regionName, roe.getIndex(), responseValue);
       }
     }
 
+    if (response.hasRegionStatistics()) {
+      ClientProtos.MultiRegionLoadStats stats = response.getRegionStatistics();
+      for (int i = 0; i < stats.getRegionCount(); i++) {
+        results.addStatistic(stats.getRegion(i).getValue().toByteArray(), stats.getStat(i));
+      }
+    }
+
     return results;
   }
 
@@ -161,11 +167,9 @@ public final class ResponseConverter {
    * @param r
    * @return an action result builder
    */
-  public static ResultOrException.Builder buildActionResult(final ClientProtos.Result r,
-      ClientProtos.RegionLoadStats stats) {
+  public static ResultOrException.Builder buildActionResult(final ClientProtos.Result r) {
     ResultOrException.Builder builder = ResultOrException.newBuilder();
     if (r != null) builder.setResult(r);
-    if(stats != null) builder.setLoadStats(stats);
     return builder;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/ef712df9/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index 645cc42..1003d24 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -185,10 +185,12 @@ public class TestAsyncProcess {
     }
 
     @Override
-    protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
+    protected RpcRetryingCaller<MultiResponse> createCaller(
+        PayloadCarryingServerCallable callable) {
       callsCt.incrementAndGet();
+      MultiServerCallable callable1 = (MultiServerCallable) callable;
       final MultiResponse mr = createMultiResponse(
-          callable.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() {
+          callable1.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() {
             @Override
             public void addResponse(MultiResponse mr, byte[] regionName, Action<Row> a) {
               if (Arrays.equals(FAILS, a.getAction().getRow())) {
@@ -227,7 +229,8 @@ public class TestAsyncProcess {
     }
 
     @Override
-    public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable, int callTimeout)
+    public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable,
+                                            int callTimeout)
         throws IOException, RuntimeException {
       throw e;
     }
@@ -245,7 +248,8 @@ public class TestAsyncProcess {
     }
 
     @Override
-    protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
+    protected RpcRetryingCaller<MultiResponse> createCaller(
+      PayloadCarryingServerCallable callable) {
       callsCt.incrementAndGet();
       return new CallerWithFailure(ioe);
     }
@@ -282,7 +286,8 @@ public class TestAsyncProcess {
 
     @Override
     protected RpcRetryingCaller<MultiResponse> createCaller(
-        MultiServerCallable<Row> callable) {
+        PayloadCarryingServerCallable payloadCallable) {
+      MultiServerCallable<Row> callable = (MultiServerCallable) payloadCallable;
       final MultiResponse mr = createMultiResponse(
           callable.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() {
             @Override
@@ -312,7 +317,8 @@ public class TestAsyncProcess {
 
       return new RpcRetryingCallerImpl<MultiResponse>(100, 10, 9) {
         @Override
-        public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable, int callTimeout)
+        public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable,
+                                                int callTimeout)
         throws IOException, RuntimeException {
           long sleep = -1;
           if (isDefault) {