You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jy...@apache.org on 2014/12/16 20:12:48 UTC

[2/2] hbase git commit: HBASE-5162 Basic client pushback mechanism

HBASE-5162 Basic client pushback mechanism

Instead of just blocking the client for 90 seconds when the region gets too
busy, it now sends along region load stats to the client so the client can
know how busy the server is. Currently, its just the load on the memstore, but
it can be extended for other stats (e.g. cpu, general memory, etc.).

It is then up to the client to decide if it wants to listen to these stats.
By default, the client ignores the stats, but it can easily be toggled to the
built-in exponential back-off or users can plug in their own back-off
implementations


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a411227b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a411227b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a411227b

Branch: refs/heads/master
Commit: a411227b0ebf78b4ee8ae7179e162b54734e77de
Parents: e5d813c
Author: Jesse Yates <je...@gmail.com>
Authored: Tue Oct 28 16:14:16 2014 -0700
Committer: Jesse Yates <jy...@apache.org>
Committed: Tue Dec 16 11:14:30 2014 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/client/AsyncProcess.java       | 120 ++-
 .../hadoop/hbase/client/ClusterConnection.java  |  12 +-
 .../hadoop/hbase/client/ConnectionAdapter.java  |  11 +
 .../hadoop/hbase/client/ConnectionManager.java  |  26 +-
 .../hadoop/hbase/client/DelayingRunner.java     | 116 +++
 .../org/apache/hadoop/hbase/client/HTable.java  |   5 +-
 .../apache/hadoop/hbase/client/MultiAction.java |  17 +-
 .../org/apache/hadoop/hbase/client/Result.java  |  20 +-
 .../hadoop/hbase/client/ResultStatsUtil.java    |  76 ++
 .../hadoop/hbase/client/RpcRetryingCaller.java  | 217 +----
 .../hbase/client/RpcRetryingCallerFactory.java  |  42 +-
 .../hbase/client/RpcRetryingCallerImpl.java     | 238 ++++++
 .../hbase/client/ServerStatisticTracker.java    |  74 ++
 .../client/StatsTrackingRpcRetryingCaller.java  |  77 ++
 .../client/backoff/ClientBackoffPolicy.java     |  42 +
 .../backoff/ClientBackoffPolicyFactory.java     |  59 ++
 .../backoff/ExponentialClientBackoffPolicy.java |  71 ++
 .../hbase/client/backoff/ServerStatistics.java  |  68 ++
 .../hbase/ipc/RegionCoprocessorRpcChannel.java  |   2 +-
 .../hbase/protobuf/ResponseConverter.java       |  16 +-
 .../hadoop/hbase/client/TestAsyncProcess.java   |   6 +-
 .../client/TestClientExponentialBackoff.java    | 110 +++
 .../client/TestFastFailWithoutTestUtil.java     |   2 +-
 .../org/apache/hadoop/hbase/HConstants.java     |   6 +
 .../hbase/protobuf/generated/ClientProtos.java  | 810 ++++++++++++++++++-
 hbase-protocol/src/main/protobuf/Client.proto   |  10 +
 .../hbase/mapreduce/LoadIncrementalHFiles.java  |   3 +-
 .../hadoop/hbase/regionserver/HRegion.java      |  34 +-
 .../hbase/regionserver/RSRpcServices.java       |  18 +-
 .../regionserver/wal/WALEditsReplaySink.java    |   2 +-
 .../hbase/client/HConnectionTestingUtility.java |   2 +-
 .../hadoop/hbase/client/TestClientPushback.java |  94 +++
 .../hadoop/hbase/client/TestReplicasClient.java |   2 +-
 .../TestRegionReplicaReplicationEndpoint.java   |   3 +-
 34 files changed, 2110 insertions(+), 301 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index e0c14a6..8b1db8f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -48,6 +49,7 @@ import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -313,7 +315,8 @@ class AsyncProcess {
    * Uses default ExecutorService for this AP (must have been created with one).
    */
   public <CResult> AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows,
-      boolean atLeastOne, Batch.Callback<CResult> callback, boolean needResults) throws InterruptedIOException {
+      boolean atLeastOne, Batch.Callback<CResult> callback, boolean needResults)
+      throws InterruptedIOException {
     return submit(null, tableName, rows, atLeastOne, callback, needResults);
   }
 
@@ -374,7 +377,7 @@ class AsyncProcess {
           locationErrors = new ArrayList<Exception>();
           locationErrorRows = new ArrayList<Integer>();
           LOG.error("Failed to get region location ", ex);
-          // This action failed before creating ars. Add it to retained but do not add to submit list.
+          // This action failed before creating ars. Retain it, but do not add to submit list.
           // We will then add it to ars in an already-failed state.
           retainedActions.add(new Action<Row>(r, ++posInList));
           locationErrors.add(ex);
@@ -918,14 +921,12 @@ class AsyncProcess {
       return loc;
     }
 
-
-
     /**
      * Send a multi action structure to the servers, after a delay depending on the attempt
      * number. Asynchronous.
      *
      * @param actionsByServer the actions structured by regions
-     * @param numAttempt      the attempt number.
+     * @param numAttempt the attempt number.
      * @param actionsForReplicaThread original actions for replica thread; null on non-first call.
      */
     private void sendMultiAction(Map<ServerName, MultiAction<Row>> actionsByServer,
@@ -935,33 +936,98 @@ class AsyncProcess {
       int actionsRemaining = actionsByServer.size();
       // This iteration is by server (the HRegionLocation comparator is by server portion only).
       for (Map.Entry<ServerName, MultiAction<Row>> e : actionsByServer.entrySet()) {
-        final ServerName server = e.getKey();
-        final MultiAction<Row> multiAction = e.getValue();
+        ServerName server = e.getKey();
+        MultiAction<Row> multiAction = e.getValue();
         incTaskCounters(multiAction.getRegions(), server);
-        Runnable runnable = Trace.wrap("AsyncProcess.sendMultiAction",
-            new SingleServerRequestRunnable(multiAction, numAttempt, server));
-        if ((--actionsRemaining == 0) && reuseThread) {
-          runnable.run();
-        } else {
-          try {
-            pool.submit(runnable);
-          } catch (RejectedExecutionException ree) {
-            // This should never happen. But as the pool is provided by the end user, let's secure
-            //  this a little.
-            decTaskCounters(multiAction.getRegions(), server);
-            LOG.warn("#" + id + ", the task was rejected by the pool. This is unexpected." +
-                " Server is " + server.getServerName(), ree);
-            // We're likely to fail again, but this will increment the attempt counter, so it will
-            //  finish.
-            receiveGlobalFailure(multiAction, server, numAttempt, ree);
+        Collection<? extends Runnable> runnables = getNewMultiActionRunnable(server, multiAction,
+            numAttempt);
+        // make sure we correctly count the number of runnables before we try to reuse the send
+        // thread, in case we had to split the request into different runnables because of backoff
+        if (runnables.size() > actionsRemaining) {
+          actionsRemaining = runnables.size();
+        }
+
+        // run all the runnables
+        for (Runnable runnable : runnables) {
+          if ((--actionsRemaining == 0) && reuseThread) {
+            runnable.run();
+          } else {
+            try {
+              pool.submit(runnable);
+            } catch (RejectedExecutionException ree) {
+              // This should never happen. But as the pool is provided by the end user, let's secure
+              //  this a little.
+              decTaskCounters(multiAction.getRegions(), server);
+              LOG.warn("#" + id + ", the task was rejected by the pool. This is unexpected." +
+                  " Server is " + server.getServerName(), ree);
+              // We're likely to fail again, but this will increment the attempt counter, so it will
+              //  finish.
+              receiveGlobalFailure(multiAction, server, numAttempt, ree);
+            }
           }
         }
       }
+
       if (actionsForReplicaThread != null) {
         startWaitingForReplicaCalls(actionsForReplicaThread);
       }
     }
 
+    private Collection<? extends Runnable> getNewMultiActionRunnable(ServerName server,
+        MultiAction<Row> multiAction,
+        int numAttempt) {
+      // no stats to manage, just do the standard action
+      if (AsyncProcess.this.connection.getStatisticsTracker() == null) {
+        return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction",
+            new SingleServerRequestRunnable(multiAction, numAttempt, server)));
+      }
+
+      // group the actions by the amount of delay
+      Map<Long, DelayingRunner> actions = new HashMap<Long, DelayingRunner>(multiAction
+          .size());
+
+      // split up the actions
+      for (Map.Entry<byte[], List<Action<Row>>> e : multiAction.actions.entrySet()) {
+        Long backoff = getBackoff(server, e.getKey());
+        DelayingRunner runner = actions.get(backoff);
+        if (runner == null) {
+          actions.put(backoff, new DelayingRunner(backoff, e));
+        } else {
+          runner.add(e);
+        }
+      }
+
+      List<Runnable> toReturn = new ArrayList<Runnable>(actions.size());
+      for (DelayingRunner runner : actions.values()) {
+        String traceText = "AsyncProcess.sendMultiAction";
+        Runnable runnable =
+            new SingleServerRequestRunnable(runner.getActions(), numAttempt, server);
+        // use a delay runner only if we need to sleep for some time
+        if (runner.getSleepTime() > 0) {
+          runner.setRunner(runnable);
+          traceText = "AsyncProcess.clientBackoff.sendMultiAction";
+          runnable = runner;
+        }
+        runnable = Trace.wrap(traceText, runnable);
+        toReturn.add(runnable);
+
+      }
+      return toReturn;
+    }
+
+    /**
+     * @param server server location where the target region is hosted
+     * @param regionName name of the region which we are going to write some data
+     * @return the amount of time the client should wait until it submit a request to the
+     * specified server and region
+     */
+    private Long getBackoff(ServerName server, byte[] regionName) {
+      ServerStatisticTracker tracker = AsyncProcess.this.connection.getStatisticsTracker();
+      ServerStatistics stats = tracker.getStats(server);
+      return AsyncProcess.this.connection.getBackoffPolicy()
+          .getBackoffTime(server, regionName, stats);
+    }
+
     /**
      * Starts waiting to issue replica calls on a different thread; or issues them immediately.
      */
@@ -1169,6 +1235,13 @@ class AsyncProcess {
               ++failed;
             }
           } else {
+            // update the stats about the region, if its a user table. We don't want to slow down
+            // updates to meta tables, especially from internal updates (master, etc).
+            if (AsyncProcess.this.connection.getStatisticsTracker() != null) {
+              result = ResultStatsUtil.updateStats(result,
+                  AsyncProcess.this.connection.getStatisticsTracker(), server, regionName);
+            }
+
             if (callback != null) {
               try {
                 //noinspection unchecked
@@ -1498,7 +1571,6 @@ class AsyncProcess {
     }
   }
 
-
   @VisibleForTesting
   /** Create AsyncRequestFuture. Isolated to be easily overridden in the tests. */
   protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
index 8989725..45b99eb 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService;
@@ -288,5 +289,14 @@ public interface ClusterConnection extends HConnection {
    * @return true if this is a managed connection.
    */
   boolean isManaged();
-}
 
+  /**
+   * @return the current statistics tracker associated with this connection
+   */
+  ServerStatisticTracker getStatisticsTracker();
+
+  /**
+   * @return the configured client backoff policy
+   */
+  ClientBackoffPolicy getBackoffPolicy();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java
index 0011328..53c1271 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
 import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
@@ -447,4 +448,14 @@ abstract class ConnectionAdapter implements ClusterConnection {
   public boolean isManaged() {
     return wrappedConnection.isManaged();
   }
+
+  @Override
+  public ServerStatisticTracker getStatisticsTracker() {
+    return wrappedConnection.getStatisticsTracker();
+  }
+
+  @Override
+  public ClientBackoffPolicy getBackoffPolicy() {
+    return wrappedConnection.getBackoffPolicy();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
index f813ebd..acb64c8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
@@ -64,6 +64,8 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
+import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
+import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
 import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
@@ -541,6 +543,8 @@ class ConnectionManager {
     final int rpcTimeout;
     private NonceGenerator nonceGenerator = null;
     private final AsyncProcess asyncProcess;
+    // single tracker per connection
+    private final ServerStatisticTracker stats;
 
     private volatile boolean closed;
     private volatile boolean aborted;
@@ -596,6 +600,8 @@ class ConnectionManager {
      */
      Registry registry;
 
+    private final ClientBackoffPolicy backoffPolicy;
+
      HConnectionImplementation(Configuration conf, boolean managed) throws IOException {
        this(conf, managed, null, null);
      }
@@ -670,9 +676,11 @@ class ConnectionManager {
       } else {
         this.nonceGenerator = new NoNonceGenerator();
       }
+      stats = ServerStatisticTracker.create(conf);
       this.asyncProcess = createAsyncProcess(this.conf);
       this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build();
-      this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor);
+      this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats);
+      this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
     }
 
     @Override
@@ -2207,7 +2215,8 @@ class ConnectionManager {
     protected AsyncProcess createAsyncProcess(Configuration conf) {
       // No default pool available.
       return new AsyncProcess(this, conf, this.batchPool,
-          RpcRetryingCallerFactory.instantiate(conf), false, RpcControllerFactory.instantiate(conf));
+          RpcRetryingCallerFactory.instantiate(conf, this.getStatisticsTracker()), false,
+          RpcControllerFactory.instantiate(conf));
     }
 
     @Override
@@ -2215,6 +2224,16 @@ class ConnectionManager {
       return asyncProcess;
     }
 
+    @Override
+    public ServerStatisticTracker getStatisticsTracker() {
+      return this.stats;
+    }
+
+    @Override
+    public ClientBackoffPolicy getBackoffPolicy() {
+      return this.backoffPolicy;
+    }
+
     /*
      * Return the number of cached region for a table. It will only be called
      * from a unit test.
@@ -2506,7 +2525,8 @@ class ConnectionManager {
 
     @Override
     public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) {
-      return RpcRetryingCallerFactory.instantiate(conf, this.interceptor);
+      return RpcRetryingCallerFactory
+          .instantiate(conf, this.interceptor, this.getStatisticsTracker());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelayingRunner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelayingRunner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelayingRunner.java
new file mode 100644
index 0000000..83c73b6
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelayingRunner.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A wrapper for a runnable for a group of actions for a single regionserver.
+ * <p>
+ * This can be used to build up the actions that should be taken and then
+ * </p>
+ * <p>
+ * This class exists to simulate using a ScheduledExecutorService with just a regular
+ * ExecutorService and Runnables. It is used for legacy reasons in the the client; this could
+ * only be removed if we change the expectations in HTable around the pool the client is able to
+ * pass in and even if we deprecate the current APIs would require keeping this class around
+ * for the interim to bridge between the legacy ExecutorServices and the scheduled pool.
+ * </p>
+ */
+@InterfaceAudience.Private
+public class DelayingRunner<T> implements Runnable {
+  private static final Log LOG = LogFactory.getLog(DelayingRunner.class);
+
+  private final Object sleepLock = new Object();
+  private boolean triggerWake = false;
+  private long sleepTime;
+  private MultiAction<T> actions = new MultiAction<T>();
+  private Runnable runnable;
+
+  public DelayingRunner(long sleepTime, Map.Entry<byte[], List<Action<T>>> e) {
+    this.sleepTime = sleepTime;
+    add(e);
+  }
+
+  public void setRunner(Runnable runner) {
+    this.runnable = runner;
+  }
+
+  @Override
+  public void run() {
+    if (!sleep()) {
+      LOG.warn(
+          "Interrupted while sleeping for expected sleep time " + sleepTime + " ms");
+    }
+    //TODO maybe we should consider switching to a listenableFuture for the actual callable and
+    // then handling the results/errors as callbacks. That way we can decrement outstanding tasks
+    // even if we get interrupted here, but for now, we still need to run so we decrement the
+    // outstanding tasks
+    this.runnable.run();
+  }
+
+  /**
+   * Sleep for an expected amount of time.
+   * <p>
+   * This is nearly a copy of what the Sleeper does, but with the ability to know if you
+   * got interrupted while sleeping.
+   * </p>
+   *
+   * @return <tt>true</tt> if the sleep completely entirely successfully,
+   * but otherwise <tt>false</tt> if the sleep was interrupted.
+   */
+  private boolean sleep() {
+    long now = EnvironmentEdgeManager.currentTime();
+    long startTime = now;
+    long waitTime = sleepTime;
+    while (waitTime > 0) {
+      long woke = -1;
+      try {
+        synchronized (sleepLock) {
+          if (triggerWake) break;
+          sleepLock.wait(waitTime);
+        }
+        woke = EnvironmentEdgeManager.currentTime();
+      } catch (InterruptedException iex) {
+        return false;
+      }
+      // Recalculate waitTime.
+      woke = (woke == -1) ? EnvironmentEdgeManager.currentTime() : woke;
+      waitTime = waitTime - (woke - startTime);
+    }
+    return true;
+  }
+
+  public void add(Map.Entry<byte[], List<Action<T>>> e) {
+    actions.add(e.getKey(), e.getValue());
+  }
+
+  public MultiAction<T> getActions() {
+    return actions;
+  }
+
+  public long getSleepTime() {
+    return sleepTime;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index fd0470a..0508fce 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -1887,8 +1887,9 @@ public class HTable implements HTableInterface, RegionLocator {
 
     AsyncProcess asyncProcess =
         new AsyncProcess(connection, configuration, pool,
-            RpcRetryingCallerFactory.instantiate(configuration), true,
-            RpcControllerFactory.instantiate(configuration));
+            RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()),
+            true, RpcControllerFactory.instantiate(configuration));
+
     AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs,
         new Callback<ClientProtos.CoprocessorServiceResult>() {
           @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
index 8f1dc4d..16ab852 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hbase.client;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -68,12 +69,24 @@ public final class MultiAction<R> {
    * @param a
    */
   public void add(byte[] regionName, Action<R> a) {
+    add(regionName, Arrays.asList(a));
+  }
+
+  /**
+   * Add an Action to this container based on it's regionName. If the regionName
+   * is wrong, the initial execution will fail, but will be automatically
+   * retried after looking up the correct region.
+   *
+   * @param regionName
+   * @param actionList list of actions to add for the region
+   */
+  public void add(byte[] regionName, List<Action<R>> actionList){
     List<Action<R>> rsActions = actions.get(regionName);
     if (rsActions == null) {
-      rsActions = new ArrayList<Action<R>>();
+      rsActions = new ArrayList<Action<R>>(actionList.size());
       actions.put(regionName, rsActions);
     }
-    rsActions.add(a);
+    rsActions.addAll(actionList);
   }
 
   public void setNonceGroup(long nonceGroup) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
index 8303121..08d9b80 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
@@ -94,6 +95,7 @@ public class Result implements CellScannable, CellScanner {
    * Index for where we are when Result is acting as a {@link CellScanner}.
    */
   private int cellScannerIndex = INITIAL_CELLSCANNER_INDEX;
+  private ClientProtos.RegionLoadStats stats;
 
   /**
    * Creates an empty Result w/ no KeyValue payload; returns null if you call {@link #rawCells()}.
@@ -794,4 +796,20 @@ public class Result implements CellScannable, CellScanner {
   public boolean isStale() {
     return stale;
   }
-}
+
+  /**
+   * Add load information about the region to the information about the result
+   * @param loadStats statistics about the current region from which this was returned
+   */
+  public void addResults(ClientProtos.RegionLoadStats loadStats) {
+    this.stats = loadStats;
+  }
+
+  /**
+   * @return the associated statistics about the region from which this was returned. Can be
+   * <tt>null</tt> if stats are disabled.
+   */
+  public ClientProtos.RegionLoadStats getStats() {
+    return stats;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java
new file mode 100644
index 0000000..3caa63e
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+
+/**
+ * A {@link Result} with some statistics about the server/region status
+ */
+@InterfaceAudience.Private
+public final class ResultStatsUtil {
+
+  private ResultStatsUtil() {
+    //private ctor for util class
+  }
+
+  /**
+   * Update the stats for the specified region if the result is an instance of {@link
+   * ResultStatsUtil}
+   *
+   * @param r object that contains the result and possibly the statistics about the region
+   * @param serverStats stats tracker to update from the result
+   * @param server server from which the result was obtained
+   * @param regionName full region name for the stats.
+   * @return the underlying {@link Result} if the passed result is an {@link
+   * ResultStatsUtil} or just returns the result;
+   */
+  public static <T> T updateStats(T r, ServerStatisticTracker serverStats,
+      ServerName server, byte[] regionName) {
+    if (!(r instanceof Result)) {
+      return r;
+    }
+    Result result = (Result) r;
+    // early exit if there are no stats to collect
+    ClientProtos.RegionLoadStats stats = result.getStats();
+    if(stats == null){
+      return r;
+    }
+
+    if (regionName != null) {
+      serverStats.updateRegionStats(server, regionName, stats);
+    }
+
+    return r;
+  }
+
+  public static <T> T updateStats(T r, ServerStatisticTracker stats,
+      HRegionLocation regionLocation) {
+    byte[] regionName = null;
+    ServerName server = null;
+    if (regionLocation != null) {
+      server = regionLocation.getServerName();
+      regionName = regionLocation.getRegionInfo().getRegionName();
+    }
+
+    return updateStats(r, stats, server, regionName);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
index a2c4d99..807c227 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
@@ -1,5 +1,4 @@
 /**
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,93 +15,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.hbase.client;
 
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.lang.reflect.UndeclaredThrowableException;
-import java.net.SocketTimeoutException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.ExceptionUtil;
-import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
 
-import com.google.protobuf.ServiceException;
+import java.io.IOException;
 
 /**
- * Runs an rpc'ing {@link RetryingCallable}. Sets into rpc client
- * threadlocal outstanding timeouts as so we don't persist too much.
- * Dynamic rather than static so can set the generic appropriately.
  *
- * This object has a state. It should not be used by in parallel by different threads.
- * Reusing it is possible however, even between multiple threads. However, the user will
- *  have to manage the synchronization on its side: there is no synchronization inside the class.
  */
-@InterfaceAudience.Private
-public class RpcRetryingCaller<T> {
-  public static final Log LOG = LogFactory.getLog(RpcRetryingCaller.class);
-  /**
-   * When we started making calls.
-   */
-  private long globalStartTime;
-  /**
-   * Start and end times for a single call.
-   */
-  private final static int MIN_RPC_TIMEOUT = 2000;
-  /** How many retries are allowed before we start to log */
-  private final int startLogErrorsCnt;
-
-  private final long pause;
-  private final int retries;
-  private final AtomicBoolean cancelled = new AtomicBoolean(false);
-  private final RetryingCallerInterceptor interceptor;
-  private final RetryingCallerInterceptorContext context;
-
-  public RpcRetryingCaller(long pause, int retries, int startLogErrorsCnt) {
-    this(pause, retries, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, startLogErrorsCnt);
-  }
-  
-  public RpcRetryingCaller(long pause, int retries,
-      RetryingCallerInterceptor interceptor, int startLogErrorsCnt) {
-    this.pause = pause;
-    this.retries = retries;
-    this.interceptor = interceptor;
-    context = interceptor.createEmptyContext();
-    this.startLogErrorsCnt = startLogErrorsCnt;
-  }
-
-  private int getRemainingTime(int callTimeout) {
-    if (callTimeout <= 0) {
-      return 0;
-    } else {
-      if (callTimeout == Integer.MAX_VALUE) return Integer.MAX_VALUE;
-      int remainingTime = (int) (callTimeout -
-          (EnvironmentEdgeManager.currentTime() - this.globalStartTime));
-      if (remainingTime < MIN_RPC_TIMEOUT) {
-        // If there is no time left, we're trying anyway. It's too late.
-        // 0 means no timeout, and it's not the intent here. So we secure both cases by
-        // resetting to the minimum.
-        remainingTime = MIN_RPC_TIMEOUT;
-      }
-      return remainingTime;
-    }
-  }
-  
-  public void cancel(){
-    cancelled.set(true);
-    synchronized (cancelled){
-      cancelled.notifyAll();
-    }
-  }
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface RpcRetryingCaller<T> {
+  void cancel();
 
   /**
    * Retries if invocation fails.
@@ -112,75 +38,8 @@ public class RpcRetryingCaller<T> {
    * @throws IOException if a remote or network exception occurs
    * @throws RuntimeException other unspecified error
    */
-  public T callWithRetries(RetryingCallable<T> callable, int callTimeout)
-  throws IOException, RuntimeException {
-    List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions =
-      new ArrayList<RetriesExhaustedException.ThrowableWithExtraContext>();
-    this.globalStartTime = EnvironmentEdgeManager.currentTime();
-    context.clear();
-    for (int tries = 0;; tries++) {
-      long expectedSleep;
-      try {
-        callable.prepare(tries != 0); // if called with false, check table status on ZK
-        interceptor.intercept(context.prepare(callable, tries));
-        return callable.call(getRemainingTime(callTimeout));
-      } catch (PreemptiveFastFailException e) {
-        throw e;
-      } catch (Throwable t) {
-        ExceptionUtil.rethrowIfInterrupt(t);
-        if (tries > startLogErrorsCnt) {
-          LOG.info("Call exception, tries=" + tries + ", retries=" + retries + ", started=" +
-              (EnvironmentEdgeManager.currentTime() - this.globalStartTime) + " ms ago, "
-              + "cancelled=" + cancelled.get() + ", msg="
-              + callable.getExceptionMessageAdditionalDetail());
-        }
-
-        // translateException throws exception when should not retry: i.e. when request is bad.
-        interceptor.handleFailure(context, t);
-        t = translateException(t);
-        callable.throwable(t, retries != 1);
-        RetriesExhaustedException.ThrowableWithExtraContext qt =
-            new RetriesExhaustedException.ThrowableWithExtraContext(t,
-                EnvironmentEdgeManager.currentTime(), toString());
-        exceptions.add(qt);
-        if (tries >= retries - 1) {
-          throw new RetriesExhaustedException(tries, exceptions);
-        }
-        // If the server is dead, we need to wait a little before retrying, to give
-        //  a chance to the regions to be
-        // tries hasn't been bumped up yet so we use "tries + 1" to get right pause time
-        expectedSleep = callable.sleep(pause, tries + 1);
-
-        // If, after the planned sleep, there won't be enough time left, we stop now.
-        long duration = singleCallDuration(expectedSleep);
-        if (duration > callTimeout) {
-          String msg = "callTimeout=" + callTimeout + ", callDuration=" + duration +
-              ": " + callable.getExceptionMessageAdditionalDetail();
-          throw (SocketTimeoutException)(new SocketTimeoutException(msg).initCause(t));
-        }
-      } finally {
-        interceptor.updateFailureInfo(context);
-      }
-      try {
-        if (expectedSleep > 0) {
-          synchronized (cancelled) {
-            if (cancelled.get()) return null;
-            cancelled.wait(expectedSleep);
-          }
-        }
-        if (cancelled.get()) return null;
-      } catch (InterruptedException e) {
-        throw new InterruptedIOException("Interrupted after " + tries + " tries  on " + retries);
-      }
-    }
-  }
-
-  /**
-   * @return Calculate how long a single call took
-   */
-  private long singleCallDuration(final long expectedSleep) {
-    return (EnvironmentEdgeManager.currentTime() - this.globalStartTime) + expectedSleep;
-  }
+  T callWithRetries(RetryingCallable<T> callable, int callTimeout)
+  throws IOException, RuntimeException;
 
   /**
    * Call the server once only.
@@ -191,62 +50,6 @@ public class RpcRetryingCaller<T> {
    * @throws IOException if a remote or network exception occurs
    * @throws RuntimeException other unspecified error
    */
-  public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout)
-  throws IOException, RuntimeException {
-    // The code of this method should be shared with withRetries.
-    this.globalStartTime = EnvironmentEdgeManager.currentTime();
-    try {
-      callable.prepare(false);
-      return callable.call(callTimeout);
-    } catch (Throwable t) {
-      Throwable t2 = translateException(t);
-      ExceptionUtil.rethrowIfInterrupt(t2);
-      // It would be nice to clear the location cache here.
-      if (t2 instanceof IOException) {
-        throw (IOException)t2;
-      } else {
-        throw new RuntimeException(t2);
-      }
-    }
-  }
-  
-  /**
-   * Get the good or the remote exception if any, throws the DoNotRetryIOException.
-   * @param t the throwable to analyze
-   * @return the translated exception, if it's not a DoNotRetryIOException
-   * @throws DoNotRetryIOException - if we find it, we throw it instead of translating.
-   */
-  static Throwable translateException(Throwable t) throws DoNotRetryIOException {
-    if (t instanceof UndeclaredThrowableException) {
-      if (t.getCause() != null) {
-        t = t.getCause();
-      }
-    }
-    if (t instanceof RemoteException) {
-      t = ((RemoteException)t).unwrapRemoteException();
-    }
-    if (t instanceof LinkageError) {
-      throw new DoNotRetryIOException(t);
-    }
-    if (t instanceof ServiceException) {
-      ServiceException se = (ServiceException)t;
-      Throwable cause = se.getCause();
-      if (cause != null && cause instanceof DoNotRetryIOException) {
-        throw (DoNotRetryIOException)cause;
-      }
-      // Don't let ServiceException out; its rpc specific.
-      t = cause;
-      // t could be a RemoteException so go aaround again.
-      translateException(t);
-    } else if (t instanceof DoNotRetryIOException) {
-      throw (DoNotRetryIOException)t;
-    }
-    return t;
-  }
-
-  @Override
-  public String toString() {
-    return "RpcRetryingCaller{" + "globalStartTime=" + globalStartTime +
-        ", pause=" + pause + ", retries=" + retries + '}';
-  }
+  T callWithoutRetries(RetryingCallable<T> callable, int callTimeout)
+  throws IOException, RuntimeException;
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
index 9f05997..6f2760f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
@@ -35,6 +35,8 @@ public class RpcRetryingCallerFactory {
   private final int retries;
   private final RetryingCallerInterceptor interceptor;
   private final int startLogErrorsCnt;
+  private final boolean enableBackPressure;
+  private ServerStatisticTracker stats;
 
   public RpcRetryingCallerFactory(Configuration conf) {
     this(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR);
@@ -49,27 +51,53 @@ public class RpcRetryingCallerFactory {
     startLogErrorsCnt = conf.getInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY,
         AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
     this.interceptor = interceptor;
+    enableBackPressure = conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE,
+        HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE);
+  }
+
+  /**
+   * Set the tracker that should be used for tracking statistics about the server
+   */
+  public void setStatisticTracker(ServerStatisticTracker statisticTracker) {
+    this.stats = statisticTracker;
   }
 
   public <T> RpcRetryingCaller<T> newCaller() {
     // We store the values in the factory instance. This way, constructing new objects
     //  is cheap as it does not require parsing a complex structure.
-      return new RpcRetryingCaller<T>(pause, retries, interceptor, startLogErrorsCnt);
+    RpcRetryingCaller<T> caller = new RpcRetryingCallerImpl<T>(pause, retries, interceptor,
+        startLogErrorsCnt);
+
+    // wrap it with stats, if we are tracking them
+    if (enableBackPressure && this.stats != null) {
+      caller = new StatsTrackingRpcRetryingCaller<T>(caller, this.stats);
+    }
+
+    return caller;
   }
 
   public static RpcRetryingCallerFactory instantiate(Configuration configuration) {
-    return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR);
+    return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null);
   }
-  
+
   public static RpcRetryingCallerFactory instantiate(Configuration configuration,
-      RetryingCallerInterceptor interceptor) {
+      ServerStatisticTracker stats) {
+    return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, stats);
+  }
+
+  public static RpcRetryingCallerFactory instantiate(Configuration configuration,
+      RetryingCallerInterceptor interceptor, ServerStatisticTracker stats) {
     String clazzName = RpcRetryingCallerFactory.class.getName();
     String rpcCallerFactoryClazz =
         configuration.get(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, clazzName);
     if (rpcCallerFactoryClazz.equals(clazzName)) {
       return new RpcRetryingCallerFactory(configuration, interceptor);
     }
-    return ReflectionUtils.instantiateWithCustomCtor(rpcCallerFactoryClazz,
-      new Class[] { Configuration.class }, new Object[] { configuration });
+    RpcRetryingCallerFactory factory = ReflectionUtils.instantiateWithCustomCtor(
+        rpcCallerFactoryClazz, new Class[] { Configuration.class }, new Object[] { configuration });
+
+    // setting for backwards compat with existing caller factories, rather than in the ctor
+    factory.setStatisticTracker(stats);
+    return factory;
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java
new file mode 100644
index 0000000..1d037bc
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java
@@ -0,0 +1,238 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.net.SocketTimeoutException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.ExceptionUtil;
+import org.apache.hadoop.ipc.RemoteException;
+
+import com.google.protobuf.ServiceException;
+
+/**
+ * Runs an rpc'ing {@link RetryingCallable}. Sets into rpc client
+ * threadlocal outstanding timeouts as so we don't persist too much.
+ * Dynamic rather than static so can set the generic appropriately.
+ *
+ * This object has a state. It should not be used by in parallel by different threads.
+ * Reusing it is possible however, even between multiple threads. However, the user will
+ *  have to manage the synchronization on its side: there is no synchronization inside the class.
+ */
+@InterfaceAudience.Private
+public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> {
+  public static final Log LOG = LogFactory.getLog(RpcRetryingCallerImpl.class);
+  /**
+   * When we started making calls.
+   */
+  private long globalStartTime;
+  /**
+   * Start and end times for a single call.
+   */
+  private final static int MIN_RPC_TIMEOUT = 2000;
+  /** How many retries are allowed before we start to log */
+  private final int startLogErrorsCnt;
+
+  private final long pause;
+  private final int retries;
+  private final AtomicBoolean cancelled = new AtomicBoolean(false);
+  private final RetryingCallerInterceptor interceptor;
+  private final RetryingCallerInterceptorContext context;
+
+  public RpcRetryingCallerImpl(long pause, int retries, int startLogErrorsCnt) {
+    this(pause, retries, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, startLogErrorsCnt);
+  }
+  
+  public RpcRetryingCallerImpl(long pause, int retries,
+      RetryingCallerInterceptor interceptor, int startLogErrorsCnt) {
+    this.pause = pause;
+    this.retries = retries;
+    this.interceptor = interceptor;
+    context = interceptor.createEmptyContext();
+    this.startLogErrorsCnt = startLogErrorsCnt;
+  }
+
+  private int getRemainingTime(int callTimeout) {
+    if (callTimeout <= 0) {
+      return 0;
+    } else {
+      if (callTimeout == Integer.MAX_VALUE) return Integer.MAX_VALUE;
+      int remainingTime = (int) (callTimeout -
+          (EnvironmentEdgeManager.currentTime() - this.globalStartTime));
+      if (remainingTime < MIN_RPC_TIMEOUT) {
+        // If there is no time left, we're trying anyway. It's too late.
+        // 0 means no timeout, and it's not the intent here. So we secure both cases by
+        // resetting to the minimum.
+        remainingTime = MIN_RPC_TIMEOUT;
+      }
+      return remainingTime;
+    }
+  }
+  
+  @Override
+  public void cancel(){
+    cancelled.set(true);
+    synchronized (cancelled){
+      cancelled.notifyAll();
+    }
+  }
+
+  @Override
+  public T callWithRetries(RetryingCallable<T> callable, int callTimeout)
+  throws IOException, RuntimeException {
+    List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions =
+      new ArrayList<RetriesExhaustedException.ThrowableWithExtraContext>();
+    this.globalStartTime = EnvironmentEdgeManager.currentTime();
+    context.clear();
+    for (int tries = 0;; tries++) {
+      long expectedSleep;
+      try {
+        callable.prepare(tries != 0); // if called with false, check table status on ZK
+        interceptor.intercept(context.prepare(callable, tries));
+        return callable.call(getRemainingTime(callTimeout));
+      } catch (PreemptiveFastFailException e) {
+        throw e;
+      } catch (Throwable t) {
+        ExceptionUtil.rethrowIfInterrupt(t);
+        if (tries > startLogErrorsCnt) {
+          LOG.info("Call exception, tries=" + tries + ", retries=" + retries + ", started=" +
+              (EnvironmentEdgeManager.currentTime() - this.globalStartTime) + " ms ago, "
+              + "cancelled=" + cancelled.get() + ", msg="
+              + callable.getExceptionMessageAdditionalDetail());
+        }
+
+        // translateException throws exception when should not retry: i.e. when request is bad.
+        interceptor.handleFailure(context, t);
+        t = translateException(t);
+        callable.throwable(t, retries != 1);
+        RetriesExhaustedException.ThrowableWithExtraContext qt =
+            new RetriesExhaustedException.ThrowableWithExtraContext(t,
+                EnvironmentEdgeManager.currentTime(), toString());
+        exceptions.add(qt);
+        if (tries >= retries - 1) {
+          throw new RetriesExhaustedException(tries, exceptions);
+        }
+        // If the server is dead, we need to wait a little before retrying, to give
+        //  a chance to the regions to be
+        // tries hasn't been bumped up yet so we use "tries + 1" to get right pause time
+        expectedSleep = callable.sleep(pause, tries + 1);
+
+        // If, after the planned sleep, there won't be enough time left, we stop now.
+        long duration = singleCallDuration(expectedSleep);
+        if (duration > callTimeout) {
+          String msg = "callTimeout=" + callTimeout + ", callDuration=" + duration +
+              ": " + callable.getExceptionMessageAdditionalDetail();
+          throw (SocketTimeoutException)(new SocketTimeoutException(msg).initCause(t));
+        }
+      } finally {
+        interceptor.updateFailureInfo(context);
+      }
+      try {
+        if (expectedSleep > 0) {
+          synchronized (cancelled) {
+            if (cancelled.get()) return null;
+            cancelled.wait(expectedSleep);
+          }
+        }
+        if (cancelled.get()) return null;
+      } catch (InterruptedException e) {
+        throw new InterruptedIOException("Interrupted after " + tries + " tries  on " + retries);
+      }
+    }
+  }
+
+  /**
+   * @return Calculate how long a single call took
+   */
+  private long singleCallDuration(final long expectedSleep) {
+    return (EnvironmentEdgeManager.currentTime() - this.globalStartTime) + expectedSleep;
+  }
+
+  @Override
+  public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout)
+  throws IOException, RuntimeException {
+    // The code of this method should be shared with withRetries.
+    this.globalStartTime = EnvironmentEdgeManager.currentTime();
+    try {
+      callable.prepare(false);
+      return callable.call(callTimeout);
+    } catch (Throwable t) {
+      Throwable t2 = translateException(t);
+      ExceptionUtil.rethrowIfInterrupt(t2);
+      // It would be nice to clear the location cache here.
+      if (t2 instanceof IOException) {
+        throw (IOException)t2;
+      } else {
+        throw new RuntimeException(t2);
+      }
+    }
+  }
+  
+  /**
+   * Get the good or the remote exception if any, throws the DoNotRetryIOException.
+   * @param t the throwable to analyze
+   * @return the translated exception, if it's not a DoNotRetryIOException
+   * @throws DoNotRetryIOException - if we find it, we throw it instead of translating.
+   */
+  static Throwable translateException(Throwable t) throws DoNotRetryIOException {
+    if (t instanceof UndeclaredThrowableException) {
+      if (t.getCause() != null) {
+        t = t.getCause();
+      }
+    }
+    if (t instanceof RemoteException) {
+      t = ((RemoteException)t).unwrapRemoteException();
+    }
+    if (t instanceof LinkageError) {
+      throw new DoNotRetryIOException(t);
+    }
+    if (t instanceof ServiceException) {
+      ServiceException se = (ServiceException)t;
+      Throwable cause = se.getCause();
+      if (cause != null && cause instanceof DoNotRetryIOException) {
+        throw (DoNotRetryIOException)cause;
+      }
+      // Don't let ServiceException out; its rpc specific.
+      t = cause;
+      // t could be a RemoteException so go aaround again.
+      translateException(t);
+    } else if (t instanceof DoNotRetryIOException) {
+      throw (DoNotRetryIOException)t;
+    }
+    return t;
+  }
+
+  @Override
+  public String toString() {
+    return "RpcRetryingCaller{" + "globalStartTime=" + globalStartTime +
+        ", pause=" + pause + ", retries=" + retries + '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java
new file mode 100644
index 0000000..0c7b683
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Tracks the statistics for multiple regions
+ */
+@InterfaceAudience.Private
+public class ServerStatisticTracker {
+
+  private final Map<ServerName, ServerStatistics> stats =
+      new ConcurrentHashMap<ServerName, ServerStatistics>();
+
+  public void updateRegionStats(ServerName server, byte[] region, ClientProtos.RegionLoadStats
+      currentStats) {
+    ServerStatistics stat = stats.get(server);
+
+    if (stat == null) {
+      // create a stats object and update the stats
+      synchronized (this) {
+        stat = stats.get(server);
+        // we don't have stats for that server yet, so we need to make some
+        if (stat == null) {
+          stat = new ServerStatistics();
+          stats.put(server, stat);
+        }
+      }
+    }
+    stat.update(region, currentStats);
+  }
+
+  public ServerStatistics getStats(ServerName server) {
+    return this.stats.get(server);
+  }
+
+  public static ServerStatisticTracker create(Configuration conf) {
+    if (!conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE,
+        HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE)) {
+      return null;
+    }
+    return new ServerStatisticTracker();
+  }
+
+  @VisibleForTesting
+  ServerStatistics getServerStatsForTesting(ServerName server) {
+    return stats.get(server);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java
new file mode 100644
index 0000000..cec0ee5
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+import java.io.IOException;
+
+/**
+ * An {@link RpcRetryingCaller} that will update the per-region stats for the call on return,
+ * if stats are available
+ */
+@InterfaceAudience.Private
+public class StatsTrackingRpcRetryingCaller<T> implements RpcRetryingCaller<T> {
+  private final ServerStatisticTracker stats;
+  private final RpcRetryingCaller<T> delegate;
+
+  public StatsTrackingRpcRetryingCaller(RpcRetryingCaller<T> delegate,
+      ServerStatisticTracker stats) {
+    this.delegate = delegate;
+    this.stats = stats;
+  }
+
+  @Override
+  public void cancel() {
+    delegate.cancel();
+  }
+
+  @Override
+  public T callWithRetries(RetryingCallable<T> callable, int callTimeout)
+      throws IOException, RuntimeException {
+    T result = delegate.callWithRetries(callable, callTimeout);
+    return updateStatsAndUnwrap(result, callable);
+  }
+
+  @Override
+  public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout)
+      throws IOException, RuntimeException {
+    T result = delegate.callWithRetries(callable, callTimeout);
+    return updateStatsAndUnwrap(result, callable);
+  }
+
+  private T updateStatsAndUnwrap(T result, RetryingCallable<T> callable) {
+    // don't track stats about requests that aren't to regionservers
+    if (!(callable instanceof RegionServerCallable)) {
+      return result;
+    }
+
+    // mutli-server callables span multiple regions, so they don't have a location,
+    // but they are region server callables, so we have to handle them when we process the
+    // result, not in here
+    if (callable instanceof MultiServerCallable) {
+      return result;
+    }
+
+    // update the stats for the single server callable
+    RegionServerCallable<T> regionCallable = (RegionServerCallable) callable;
+    HRegionLocation location = regionCallable.getLocation();
+    return ResultStatsUtil.updateStats(result, stats, location);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicy.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicy.java
new file mode 100644
index 0000000..94e434f
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicy.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.backoff;
+
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Configurable policy for the amount of time a client should wait for a new request to the
+ * server when given the server load statistics.
+ * <p>
+ * Must have a single-argument constructor that takes a {@link org.apache.hadoop.conf.Configuration}
+ * </p>
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public interface ClientBackoffPolicy {
+
+  public static final String BACKOFF_POLICY_CLASS =
+      "hbase.client.statistics.backoff-policy";
+
+  /**
+   * @return the number of ms to wait on the client based on the
+   */
+  public long getBackoffTime(ServerName serverName, byte[] region, ServerStatistics stats);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicyFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicyFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicyFactory.java
new file mode 100644
index 0000000..879a0e2
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicyFactory.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.backoff;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public final class ClientBackoffPolicyFactory {
+
+  private static final Log LOG = LogFactory.getLog(ClientBackoffPolicyFactory.class);
+
+  private ClientBackoffPolicyFactory() {
+  }
+
+  public static ClientBackoffPolicy create(Configuration conf) {
+    // create the backoff policy
+    String className =
+        conf.get(ClientBackoffPolicy.BACKOFF_POLICY_CLASS, NoBackoffPolicy.class
+            .getName());
+      return ReflectionUtils.instantiateWithCustomCtor(className,
+          new Class<?>[] { Configuration.class }, new Object[] { conf });
+  }
+
+  /**
+   * Default backoff policy that doesn't create any backoff for the client, regardless of load
+   */
+  public static class NoBackoffPolicy implements ClientBackoffPolicy {
+    public NoBackoffPolicy(Configuration conf){
+      // necessary to meet contract
+    }
+
+    @Override
+    public long getBackoffTime(ServerName serverName, byte[] region, ServerStatistics stats) {
+      return 0;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ExponentialClientBackoffPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ExponentialClientBackoffPolicy.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ExponentialClientBackoffPolicy.java
new file mode 100644
index 0000000..6e75670
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ExponentialClientBackoffPolicy.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.backoff;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Simple exponential backoff policy on for the client that uses a  percent^4 times the
+ * max backoff to generate the backoff time.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class ExponentialClientBackoffPolicy implements ClientBackoffPolicy {
+
+  private static final Log LOG = LogFactory.getLog(ExponentialClientBackoffPolicy.class);
+
+  private static final long ONE_MINUTE = 60 * 1000;
+  public static final long DEFAULT_MAX_BACKOFF = 5 * ONE_MINUTE;
+  public static final String MAX_BACKOFF_KEY = "hbase.client.exponential-backoff.max";
+  private long maxBackoff;
+
+  public ExponentialClientBackoffPolicy(Configuration conf) {
+    this.maxBackoff = conf.getLong(MAX_BACKOFF_KEY, DEFAULT_MAX_BACKOFF);
+  }
+
+  @Override
+  public long getBackoffTime(ServerName serverName, byte[] region, ServerStatistics stats) {
+    // no stats for the server yet, so don't backoff
+    if (stats == null) {
+      return 0;
+    }
+
+    ServerStatistics.RegionStatistics regionStats = stats.getStatsForRegion(region);
+    // no stats for the region yet - don't backoff
+    if (regionStats == null) {
+      return 0;
+    }
+
+    // square the percent as a value less than 1. Closer we move to 100 percent,
+    // the percent moves to 1, but squaring causes the exponential curve
+    double percent = regionStats.getMemstoreLoadPercent() / 100.0;
+    double multiplier = Math.pow(percent, 4.0);
+    // shouldn't ever happen, but just incase something changes in the statistic data
+    if (multiplier > 1) {
+      LOG.warn("Somehow got a backoff multiplier greater than the allowed backoff. Forcing back " +
+          "down to the max backoff");
+      multiplier = 1;
+    }
+    return (long) (multiplier * maxBackoff);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ServerStatistics.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ServerStatistics.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ServerStatistics.java
new file mode 100644
index 0000000..a3b8e11
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ServerStatistics.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.backoff;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * Track the statistics for a single region
+ */
+@InterfaceAudience.Private
+public class ServerStatistics {
+
+  private Map<byte[], RegionStatistics>
+      stats = new TreeMap<byte[], RegionStatistics>(Bytes.BYTES_COMPARATOR);
+
+  /**
+   * Good enough attempt. Last writer wins. It doesn't really matter which one gets to update,
+   * as something gets set
+   * @param region
+   * @param currentStats
+   */
+  public void update(byte[] region, ClientProtos.RegionLoadStats currentStats) {
+    RegionStatistics regionStat = this.stats.get(region);
+    if(regionStat == null){
+      regionStat = new RegionStatistics();
+      this.stats.put(region, regionStat);
+    }
+
+    regionStat.update(currentStats);
+  }
+
+  @InterfaceAudience.Private
+  public RegionStatistics getStatsForRegion(byte[] regionName){
+    return stats.get(regionName);
+  }
+
+  public static class RegionStatistics{
+    private int memstoreLoad = 0;
+
+    public void update(ClientProtos.RegionLoadStats currentStats) {
+      this.memstoreLoad = currentStats.getMemstoreLoad();
+    }
+
+    public int getMemstoreLoadPercent(){
+      return this.memstoreLoad;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
index 8c75f4f..8433cee 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
@@ -61,7 +61,7 @@ public class RegionCoprocessorRpcChannel extends CoprocessorRpcChannel{
     this.connection = conn;
     this.table = table;
     this.row = row;
-    this.rpcFactory = RpcRetryingCallerFactory.instantiate(conn.getConfiguration());
+    this.rpcFactory = RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null);
     this.operationTimeout = conn.getConfiguration().getInt(
         HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
         HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
index 70da40c..1d42a82 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
@@ -114,17 +114,23 @@ public final class ResponseConverter {
       }
 
       for (ResultOrException roe : actionResult.getResultOrExceptionList()) {
+        Object responseValue;
         if (roe.hasException()) {
-          results.add(regionName, roe.getIndex(), ProtobufUtil.toException(roe.getException()));
+          responseValue = ProtobufUtil.toException(roe.getException());
         } else if (roe.hasResult()) {
-          results.add(regionName, roe.getIndex(), ProtobufUtil.toResult(roe.getResult(), cells));
+          responseValue = ProtobufUtil.toResult(roe.getResult(), cells);
+          // add the load stats, if we got any
+          if (roe.hasLoadStats()) {
+            ((Result) responseValue).addResults(roe.getLoadStats());
+          }
         } else if (roe.hasServiceResult()) {
-          results.add(regionName, roe.getIndex(), roe.getServiceResult());
+          responseValue = roe.getServiceResult();
         } else {
           // no result & no exception. Unexpected.
           throw new IllegalStateException("No result & no exception roe=" + roe +
               " for region " + actions.getRegion());
         }
+        results.add(regionName, roe.getIndex(), responseValue);
       }
     }
 
@@ -149,9 +155,11 @@ public final class ResponseConverter {
    * @param r
    * @return an action result builder
    */
-  public static ResultOrException.Builder buildActionResult(final ClientProtos.Result r) {
+  public static ResultOrException.Builder buildActionResult(final ClientProtos.Result r,
+      ClientProtos.RegionLoadStats stats) {
     ResultOrException.Builder builder = ResultOrException.newBuilder();
     if (r != null) builder.setResult(r);
+    if(stats != null) builder.setLoadStats(stats);
     return builder;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index d219638..88a95fb 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -190,7 +190,7 @@ public class TestAsyncProcess {
             }
           });
 
-      return new RpcRetryingCaller<MultiResponse>(100, 10, 9) {
+      return new RpcRetryingCallerImpl<MultiResponse>(100, 10, 9) {
         @Override
         public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable,
                                                 int callTimeout)
@@ -208,7 +208,7 @@ public class TestAsyncProcess {
     }
   }
 
-  static class CallerWithFailure extends RpcRetryingCaller<MultiResponse>{
+  static class CallerWithFailure extends RpcRetryingCallerImpl<MultiResponse>{
 
     public CallerWithFailure() {
       super(100, 100, 9);
@@ -294,7 +294,7 @@ public class TestAsyncProcess {
         replicaCalls.incrementAndGet();
       }
 
-      return new RpcRetryingCaller<MultiResponse>(100, 10, 9) {
+      return new RpcRetryingCallerImpl<MultiResponse>(100, 10, 9) {
         @Override
         public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable, int callTimeout)
         throws IOException, RuntimeException {

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientExponentialBackoff.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientExponentialBackoff.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientExponentialBackoff.java
new file mode 100644
index 0000000..88e409d
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientExponentialBackoff.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.backoff.ExponentialClientBackoffPolicy;
+import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestClientExponentialBackoff {
+
+  ServerName server = Mockito.mock(ServerName.class);
+  byte[] regionname = Bytes.toBytes("region");
+
+  @Test
+  public void testNulls() {
+    Configuration conf = new Configuration(false);
+    ExponentialClientBackoffPolicy backoff = new ExponentialClientBackoffPolicy(conf);
+    assertEquals(0, backoff.getBackoffTime(null, null, null));
+
+    // server name doesn't matter to calculation, but check it now anyways
+    assertEquals(0, backoff.getBackoffTime(server, null, null));
+    assertEquals(0, backoff.getBackoffTime(server, regionname, null));
+
+    // check when no stats for the region yet
+    ServerStatistics stats = new ServerStatistics();
+    assertEquals(0, backoff.getBackoffTime(server, regionname, stats));
+  }
+
+  @Test
+  public void testMaxLoad() {
+    Configuration conf = new Configuration(false);
+    ExponentialClientBackoffPolicy backoff = new ExponentialClientBackoffPolicy(conf);
+
+    ServerStatistics stats = new ServerStatistics();
+    update(stats, 100);
+    assertEquals(ExponentialClientBackoffPolicy.DEFAULT_MAX_BACKOFF, backoff.getBackoffTime(server,
+        regionname, stats));
+
+    // another policy with a different max timeout
+    long max = 100;
+    conf.setLong(ExponentialClientBackoffPolicy.MAX_BACKOFF_KEY, max);
+    ExponentialClientBackoffPolicy backoffShortTimeout = new ExponentialClientBackoffPolicy(conf);
+    assertEquals(max, backoffShortTimeout.getBackoffTime(server, regionname, stats));
+
+    // test beyond 100 still doesn't exceed the max
+    update(stats, 101);
+    assertEquals(ExponentialClientBackoffPolicy.DEFAULT_MAX_BACKOFF, backoff.getBackoffTime(server,
+        regionname, stats));
+    assertEquals(max, backoffShortTimeout.getBackoffTime(server, regionname, stats));
+
+    // and that when we are below 100, its less than the max timeout
+    update(stats, 99);
+    assertTrue(backoff.getBackoffTime(server,
+        regionname, stats) < ExponentialClientBackoffPolicy.DEFAULT_MAX_BACKOFF);
+    assertTrue(backoffShortTimeout.getBackoffTime(server, regionname, stats) < max);
+  }
+
+  /**
+   * Make sure that we get results in the order that we expect - backoff for a load of 1 should
+   * less than backoff for 10, which should be less than that for 50.
+   */
+  @Test
+  public void testResultOrdering() {
+    Configuration conf = new Configuration(false);
+    // make the max timeout really high so we get differentiation between load factors
+    conf.setLong(ExponentialClientBackoffPolicy.MAX_BACKOFF_KEY, Integer.MAX_VALUE);
+    ExponentialClientBackoffPolicy backoff = new ExponentialClientBackoffPolicy(conf);
+
+    ServerStatistics stats = new ServerStatistics();
+    long previous = backoff.getBackoffTime(server, regionname, stats);
+    for (int i = 1; i <= 100; i++) {
+      update(stats, i);
+      long next = backoff.getBackoffTime(server, regionname, stats);
+      assertTrue(
+          "Previous backoff time" + previous + " >= " + next + ", the next backoff time for " +
+              "load " + i, previous < next);
+      previous = next;
+    }
+  }
+
+  private void update(ServerStatistics stats, int load) {
+    ClientProtos.RegionLoadStats stat = ClientProtos.RegionLoadStats.newBuilder()
+        .setMemstoreLoad
+            (load).build();
+    stats.update(regionname, stat);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestFastFailWithoutTestUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestFastFailWithoutTestUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestFastFailWithoutTestUtil.java
index 080cd8b..e82e59d 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestFastFailWithoutTestUtil.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestFastFailWithoutTestUtil.java
@@ -564,7 +564,7 @@ public class TestFastFailWithoutTestUtil {
 
   public RpcRetryingCaller<Void> getRpcRetryingCaller(int pauseTime,
       int retries, RetryingCallerInterceptor interceptor) {
-    return new RpcRetryingCaller<Void>(pauseTime, retries, interceptor, 9) {
+    return new RpcRetryingCallerImpl<Void>(pauseTime, retries, interceptor, 9) {
       @Override
       public Void callWithRetries(RetryingCallable<Void> callable,
           int callTimeout) throws IOException, RuntimeException {

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 6001767..33b71ad 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1099,6 +1099,12 @@ public final class HConstants {
   public static final String HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL =
       "hbase.client.fast.fail.interceptor.impl"; 
 
+  /** Config key for if the server should send backpressure and if the client should listen to
+   * that backpressure from the server */
+  public static final String ENABLE_CLIENT_BACKPRESSURE = "hbase.client.backpressure.enabled";
+  public static final boolean DEFAULT_ENABLE_CLIENT_BACKPRESSURE = false;
+
+
   private HConstants() {
     // Can't be instantiated with this ctor.
   }