You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jy...@apache.org on 2014/12/16 20:12:47 UTC

[1/2] hbase git commit: HBASE-5162 Basic client pushback mechanism

Repository: hbase
Updated Branches:
  refs/heads/master e5d813c46 -> a411227b0


http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
index c36662e..ab86e1e 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
@@ -26210,6 +26210,482 @@ public final class ClientProtos {
     // @@protoc_insertion_point(class_scope:RegionAction)
   }
 
+  public interface RegionLoadStatsOrBuilder
+      extends com.google.protobuf.MessageOrBuilder {
+
+    // optional int32 memstoreLoad = 1 [default = 0];
+    /**
+     * <code>optional int32 memstoreLoad = 1 [default = 0];</code>
+     *
+     * <pre>
+     * percent load on the memstore. Guaranteed to be positive, between 0 and 100
+     * </pre>
+     */
+    boolean hasMemstoreLoad();
+    /**
+     * <code>optional int32 memstoreLoad = 1 [default = 0];</code>
+     *
+     * <pre>
+     * percent load on the memstore. Guaranteed to be positive, between 0 and 100
+     * </pre>
+     */
+    int getMemstoreLoad();
+  }
+  /**
+   * Protobuf type {@code RegionLoadStats}
+   *
+   * <pre>
+   *
+   * Statistics about the current load on the region
+   * </pre>
+   */
+  public static final class RegionLoadStats extends
+      com.google.protobuf.GeneratedMessage
+      implements RegionLoadStatsOrBuilder {
+    // Use RegionLoadStats.newBuilder() to construct.
+    private RegionLoadStats(com.google.protobuf.GeneratedMessage.Builder<?> builder) {
+      super(builder);
+      this.unknownFields = builder.getUnknownFields();
+    }
+    private RegionLoadStats(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
+
+    private static final RegionLoadStats defaultInstance;
+    public static RegionLoadStats getDefaultInstance() {
+      return defaultInstance;
+    }
+
+    public RegionLoadStats getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+
+    private final com.google.protobuf.UnknownFieldSet unknownFields;
+    @java.lang.Override
+    public final com.google.protobuf.UnknownFieldSet
+        getUnknownFields() {
+      return this.unknownFields;
+    }
+    private RegionLoadStats(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      initFields();
+      int mutable_bitField0_ = 0;
+      com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+          com.google.protobuf.UnknownFieldSet.newBuilder();
+      try {
+        boolean done = false;
+        while (!done) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              done = true;
+              break;
+            default: {
+              if (!parseUnknownField(input, unknownFields,
+                                     extensionRegistry, tag)) {
+                done = true;
+              }
+              break;
+            }
+            case 8: {
+              bitField0_ |= 0x00000001;
+              memstoreLoad_ = input.readInt32();
+              break;
+            }
+          }
+        }
+      } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+        throw e.setUnfinishedMessage(this);
+      } catch (java.io.IOException e) {
+        throw new com.google.protobuf.InvalidProtocolBufferException(
+            e.getMessage()).setUnfinishedMessage(this);
+      } finally {
+        this.unknownFields = unknownFields.build();
+        makeExtensionsImmutable();
+      }
+    }
+    public static final com.google.protobuf.Descriptors.Descriptor
+        getDescriptor() {
+      return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.internal_static_RegionLoadStats_descriptor;
+    }
+
+    protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+        internalGetFieldAccessorTable() {
+      return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.internal_static_RegionLoadStats_fieldAccessorTable
+          .ensureFieldAccessorsInitialized(
+              org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.class, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.Builder.class);
+    }
+
+    public static com.google.protobuf.Parser<RegionLoadStats> PARSER =
+        new com.google.protobuf.AbstractParser<RegionLoadStats>() {
+      public RegionLoadStats parsePartialFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        return new RegionLoadStats(input, extensionRegistry);
+      }
+    };
+
+    @java.lang.Override
+    public com.google.protobuf.Parser<RegionLoadStats> getParserForType() {
+      return PARSER;
+    }
+
+    private int bitField0_;
+    // optional int32 memstoreLoad = 1 [default = 0];
+    public static final int MEMSTORELOAD_FIELD_NUMBER = 1;
+    private int memstoreLoad_;
+    /**
+     * <code>optional int32 memstoreLoad = 1 [default = 0];</code>
+     *
+     * <pre>
+     * percent load on the memstore. Guaranteed to be positive, between 0 and 100
+     * </pre>
+     */
+    public boolean hasMemstoreLoad() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    /**
+     * <code>optional int32 memstoreLoad = 1 [default = 0];</code>
+     *
+     * <pre>
+     * percent load on the memstore. Guaranteed to be positive, between 0 and 100
+     * </pre>
+     */
+    public int getMemstoreLoad() {
+      return memstoreLoad_;
+    }
+
+    private void initFields() {
+      memstoreLoad_ = 0;
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+
+      memoizedIsInitialized = 1;
+      return true;
+    }
+
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeInt32(1, memstoreLoad_);
+      }
+      getUnknownFields().writeTo(output);
+    }
+
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt32Size(1, memstoreLoad_);
+      }
+      size += getUnknownFields().getSerializedSize();
+      memoizedSerializedSize = size;
+      return size;
+    }
+
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+
+    @java.lang.Override
+    public boolean equals(final java.lang.Object obj) {
+      if (obj == this) {
+       return true;
+      }
+      if (!(obj instanceof org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats)) {
+        return super.equals(obj);
+      }
+      org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats other = (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats) obj;
+
+      boolean result = true;
+      result = result && (hasMemstoreLoad() == other.hasMemstoreLoad());
+      if (hasMemstoreLoad()) {
+        result = result && (getMemstoreLoad()
+            == other.getMemstoreLoad());
+      }
+      result = result &&
+          getUnknownFields().equals(other.getUnknownFields());
+      return result;
+    }
+
+    private int memoizedHashCode = 0;
+    @java.lang.Override
+    public int hashCode() {
+      if (memoizedHashCode != 0) {
+        return memoizedHashCode;
+      }
+      int hash = 41;
+      hash = (19 * hash) + getDescriptorForType().hashCode();
+      if (hasMemstoreLoad()) {
+        hash = (37 * hash) + MEMSTORELOAD_FIELD_NUMBER;
+        hash = (53 * hash) + getMemstoreLoad();
+      }
+      hash = (29 * hash) + getUnknownFields().hashCode();
+      memoizedHashCode = hash;
+      return hash;
+    }
+
+    public static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data);
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data, extensionRegistry);
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data);
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data, extensionRegistry);
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input);
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input, extensionRegistry);
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return PARSER.parseDelimitedFrom(input);
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseDelimitedFrom(input, extensionRegistry);
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input);
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input, extensionRegistry);
+    }
+
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder newBuilder(org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+
+    @java.lang.Override
+    protected Builder newBuilderForType(
+        com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+      Builder builder = new Builder(parent);
+      return builder;
+    }
+    /**
+     * Protobuf type {@code RegionLoadStats}
+     *
+     * <pre>
+     *
+     * Statistics about the current load on the region
+     * </pre>
+     */
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessage.Builder<Builder>
+       implements org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStatsOrBuilder {
+      public static final com.google.protobuf.Descriptors.Descriptor
+          getDescriptor() {
+        return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.internal_static_RegionLoadStats_descriptor;
+      }
+
+      protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+          internalGetFieldAccessorTable() {
+        return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.internal_static_RegionLoadStats_fieldAccessorTable
+            .ensureFieldAccessorsInitialized(
+                org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.class, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.Builder.class);
+      }
+
+      // Construct using org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.newBuilder()
+      private Builder() {
+        maybeForceBuilderInitialization();
+      }
+
+      private Builder(
+          com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+        super(parent);
+        maybeForceBuilderInitialization();
+      }
+      private void maybeForceBuilderInitialization() {
+        if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+        }
+      }
+      private static Builder create() {
+        return new Builder();
+      }
+
+      public Builder clear() {
+        super.clear();
+        memstoreLoad_ = 0;
+        bitField0_ = (bitField0_ & ~0x00000001);
+        return this;
+      }
+
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+
+      public com.google.protobuf.Descriptors.Descriptor
+          getDescriptorForType() {
+        return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.internal_static_RegionLoadStats_descriptor;
+      }
+
+      public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats getDefaultInstanceForType() {
+        return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.getDefaultInstance();
+      }
+
+      public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats build() {
+        org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+
+      public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats buildPartial() {
+        org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats result = new org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats(this);
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.memstoreLoad_ = memstoreLoad_;
+        result.bitField0_ = to_bitField0_;
+        onBuilt();
+        return result;
+      }
+
+      public Builder mergeFrom(com.google.protobuf.Message other) {
+        if (other instanceof org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats) {
+          return mergeFrom((org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats)other);
+        } else {
+          super.mergeFrom(other);
+          return this;
+        }
+      }
+
+      public Builder mergeFrom(org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats other) {
+        if (other == org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.getDefaultInstance()) return this;
+        if (other.hasMemstoreLoad()) {
+          setMemstoreLoad(other.getMemstoreLoad());
+        }
+        this.mergeUnknownFields(other.getUnknownFields());
+        return this;
+      }
+
+      public final boolean isInitialized() {
+        return true;
+      }
+
+      public Builder mergeFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parsedMessage = null;
+        try {
+          parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+        } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+          parsedMessage = (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats) e.getUnfinishedMessage();
+          throw e;
+        } finally {
+          if (parsedMessage != null) {
+            mergeFrom(parsedMessage);
+          }
+        }
+        return this;
+      }
+      private int bitField0_;
+
+      // optional int32 memstoreLoad = 1 [default = 0];
+      private int memstoreLoad_ ;
+      /**
+       * <code>optional int32 memstoreLoad = 1 [default = 0];</code>
+       *
+       * <pre>
+       * percent load on the memstore. Guaranteed to be positive, between 0 and 100
+       * </pre>
+       */
+      public boolean hasMemstoreLoad() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      /**
+       * <code>optional int32 memstoreLoad = 1 [default = 0];</code>
+       *
+       * <pre>
+       * percent load on the memstore. Guaranteed to be positive, between 0 and 100
+       * </pre>
+       */
+      public int getMemstoreLoad() {
+        return memstoreLoad_;
+      }
+      /**
+       * <code>optional int32 memstoreLoad = 1 [default = 0];</code>
+       *
+       * <pre>
+       * percent load on the memstore. Guaranteed to be positive, between 0 and 100
+       * </pre>
+       */
+      public Builder setMemstoreLoad(int value) {
+        bitField0_ |= 0x00000001;
+        memstoreLoad_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int32 memstoreLoad = 1 [default = 0];</code>
+       *
+       * <pre>
+       * percent load on the memstore. Guaranteed to be positive, between 0 and 100
+       * </pre>
+       */
+      public Builder clearMemstoreLoad() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        memstoreLoad_ = 0;
+        onChanged();
+        return this;
+      }
+
+      // @@protoc_insertion_point(builder_scope:RegionLoadStats)
+    }
+
+    static {
+      defaultInstance = new RegionLoadStats(true);
+      defaultInstance.initFields();
+    }
+
+    // @@protoc_insertion_point(class_scope:RegionLoadStats)
+  }
+
   public interface ResultOrExceptionOrBuilder
       extends com.google.protobuf.MessageOrBuilder {
 
@@ -26286,6 +26762,32 @@ public final class ClientProtos {
      * </pre>
      */
     org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResultOrBuilder getServiceResultOrBuilder();
+
+    // optional .RegionLoadStats loadStats = 5;
+    /**
+     * <code>optional .RegionLoadStats loadStats = 5;</code>
+     *
+     * <pre>
+     * current load on the region
+     * </pre>
+     */
+    boolean hasLoadStats();
+    /**
+     * <code>optional .RegionLoadStats loadStats = 5;</code>
+     *
+     * <pre>
+     * current load on the region
+     * </pre>
+     */
+    org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats getLoadStats();
+    /**
+     * <code>optional .RegionLoadStats loadStats = 5;</code>
+     *
+     * <pre>
+     * current load on the region
+     * </pre>
+     */
+    org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStatsOrBuilder getLoadStatsOrBuilder();
   }
   /**
    * Protobuf type {@code ResultOrException}
@@ -26389,6 +26891,19 @@ public final class ClientProtos {
               bitField0_ |= 0x00000008;
               break;
             }
+            case 42: {
+              org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.Builder subBuilder = null;
+              if (((bitField0_ & 0x00000010) == 0x00000010)) {
+                subBuilder = loadStats_.toBuilder();
+              }
+              loadStats_ = input.readMessage(org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.PARSER, extensionRegistry);
+              if (subBuilder != null) {
+                subBuilder.mergeFrom(loadStats_);
+                loadStats_ = subBuilder.buildPartial();
+              }
+              bitField0_ |= 0x00000010;
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -26533,11 +27048,46 @@ public final class ClientProtos {
       return serviceResult_;
     }
 
+    // optional .RegionLoadStats loadStats = 5;
+    public static final int LOADSTATS_FIELD_NUMBER = 5;
+    private org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats loadStats_;
+    /**
+     * <code>optional .RegionLoadStats loadStats = 5;</code>
+     *
+     * <pre>
+     * current load on the region
+     * </pre>
+     */
+    public boolean hasLoadStats() {
+      return ((bitField0_ & 0x00000010) == 0x00000010);
+    }
+    /**
+     * <code>optional .RegionLoadStats loadStats = 5;</code>
+     *
+     * <pre>
+     * current load on the region
+     * </pre>
+     */
+    public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats getLoadStats() {
+      return loadStats_;
+    }
+    /**
+     * <code>optional .RegionLoadStats loadStats = 5;</code>
+     *
+     * <pre>
+     * current load on the region
+     * </pre>
+     */
+    public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStatsOrBuilder getLoadStatsOrBuilder() {
+      return loadStats_;
+    }
+
     private void initFields() {
       index_ = 0;
       result_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Result.getDefaultInstance();
       exception_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair.getDefaultInstance();
       serviceResult_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResult.getDefaultInstance();
+      loadStats_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.getDefaultInstance();
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -26575,6 +27125,9 @@ public final class ClientProtos {
       if (((bitField0_ & 0x00000008) == 0x00000008)) {
         output.writeMessage(4, serviceResult_);
       }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        output.writeMessage(5, loadStats_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -26600,6 +27153,10 @@ public final class ClientProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeMessageSize(4, serviceResult_);
       }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(5, loadStats_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -26643,6 +27200,11 @@ public final class ClientProtos {
         result = result && getServiceResult()
             .equals(other.getServiceResult());
       }
+      result = result && (hasLoadStats() == other.hasLoadStats());
+      if (hasLoadStats()) {
+        result = result && getLoadStats()
+            .equals(other.getLoadStats());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -26672,6 +27234,10 @@ public final class ClientProtos {
         hash = (37 * hash) + SERVICE_RESULT_FIELD_NUMBER;
         hash = (53 * hash) + getServiceResult().hashCode();
       }
+      if (hasLoadStats()) {
+        hash = (37 * hash) + LOADSTATS_FIELD_NUMBER;
+        hash = (53 * hash) + getLoadStats().hashCode();
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -26783,6 +27349,7 @@ public final class ClientProtos {
           getResultFieldBuilder();
           getExceptionFieldBuilder();
           getServiceResultFieldBuilder();
+          getLoadStatsFieldBuilder();
         }
       }
       private static Builder create() {
@@ -26811,6 +27378,12 @@ public final class ClientProtos {
           serviceResultBuilder_.clear();
         }
         bitField0_ = (bitField0_ & ~0x00000008);
+        if (loadStatsBuilder_ == null) {
+          loadStats_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.getDefaultInstance();
+        } else {
+          loadStatsBuilder_.clear();
+        }
+        bitField0_ = (bitField0_ & ~0x00000010);
         return this;
       }
 
@@ -26867,6 +27440,14 @@ public final class ClientProtos {
         } else {
           result.serviceResult_ = serviceResultBuilder_.build();
         }
+        if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
+          to_bitField0_ |= 0x00000010;
+        }
+        if (loadStatsBuilder_ == null) {
+          result.loadStats_ = loadStats_;
+        } else {
+          result.loadStats_ = loadStatsBuilder_.build();
+        }
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -26895,6 +27476,9 @@ public final class ClientProtos {
         if (other.hasServiceResult()) {
           mergeServiceResult(other.getServiceResult());
         }
+        if (other.hasLoadStats()) {
+          mergeLoadStats(other.getLoadStats());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -27374,6 +27958,159 @@ public final class ClientProtos {
         return serviceResultBuilder_;
       }
 
+      // optional .RegionLoadStats loadStats = 5;
+      private org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats loadStats_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.getDefaultInstance();
+      private com.google.protobuf.SingleFieldBuilder<
+          org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.Builder, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStatsOrBuilder> loadStatsBuilder_;
+      /**
+       * <code>optional .RegionLoadStats loadStats = 5;</code>
+       *
+       * <pre>
+       * current load on the region
+       * </pre>
+       */
+      public boolean hasLoadStats() {
+        return ((bitField0_ & 0x00000010) == 0x00000010);
+      }
+      /**
+       * <code>optional .RegionLoadStats loadStats = 5;</code>
+       *
+       * <pre>
+       * current load on the region
+       * </pre>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats getLoadStats() {
+        if (loadStatsBuilder_ == null) {
+          return loadStats_;
+        } else {
+          return loadStatsBuilder_.getMessage();
+        }
+      }
+      /**
+       * <code>optional .RegionLoadStats loadStats = 5;</code>
+       *
+       * <pre>
+       * current load on the region
+       * </pre>
+       */
+      public Builder setLoadStats(org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats value) {
+        if (loadStatsBuilder_ == null) {
+          if (value == null) {
+            throw new NullPointerException();
+          }
+          loadStats_ = value;
+          onChanged();
+        } else {
+          loadStatsBuilder_.setMessage(value);
+        }
+        bitField0_ |= 0x00000010;
+        return this;
+      }
+      /**
+       * <code>optional .RegionLoadStats loadStats = 5;</code>
+       *
+       * <pre>
+       * current load on the region
+       * </pre>
+       */
+      public Builder setLoadStats(
+          org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.Builder builderForValue) {
+        if (loadStatsBuilder_ == null) {
+          loadStats_ = builderForValue.build();
+          onChanged();
+        } else {
+          loadStatsBuilder_.setMessage(builderForValue.build());
+        }
+        bitField0_ |= 0x00000010;
+        return this;
+      }
+      /**
+       * <code>optional .RegionLoadStats loadStats = 5;</code>
+       *
+       * <pre>
+       * current load on the region
+       * </pre>
+       */
+      public Builder mergeLoadStats(org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats value) {
+        if (loadStatsBuilder_ == null) {
+          if (((bitField0_ & 0x00000010) == 0x00000010) &&
+              loadStats_ != org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.getDefaultInstance()) {
+            loadStats_ =
+              org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.newBuilder(loadStats_).mergeFrom(value).buildPartial();
+          } else {
+            loadStats_ = value;
+          }
+          onChanged();
+        } else {
+          loadStatsBuilder_.mergeFrom(value);
+        }
+        bitField0_ |= 0x00000010;
+        return this;
+      }
+      /**
+       * <code>optional .RegionLoadStats loadStats = 5;</code>
+       *
+       * <pre>
+       * current load on the region
+       * </pre>
+       */
+      public Builder clearLoadStats() {
+        if (loadStatsBuilder_ == null) {
+          loadStats_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.getDefaultInstance();
+          onChanged();
+        } else {
+          loadStatsBuilder_.clear();
+        }
+        bitField0_ = (bitField0_ & ~0x00000010);
+        return this;
+      }
+      /**
+       * <code>optional .RegionLoadStats loadStats = 5;</code>
+       *
+       * <pre>
+       * current load on the region
+       * </pre>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.Builder getLoadStatsBuilder() {
+        bitField0_ |= 0x00000010;
+        onChanged();
+        return getLoadStatsFieldBuilder().getBuilder();
+      }
+      /**
+       * <code>optional .RegionLoadStats loadStats = 5;</code>
+       *
+       * <pre>
+       * current load on the region
+       * </pre>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStatsOrBuilder getLoadStatsOrBuilder() {
+        if (loadStatsBuilder_ != null) {
+          return loadStatsBuilder_.getMessageOrBuilder();
+        } else {
+          return loadStats_;
+        }
+      }
+      /**
+       * <code>optional .RegionLoadStats loadStats = 5;</code>
+       *
+       * <pre>
+       * current load on the region
+       * </pre>
+       */
+      private com.google.protobuf.SingleFieldBuilder<
+          org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.Builder, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStatsOrBuilder> 
+          getLoadStatsFieldBuilder() {
+        if (loadStatsBuilder_ == null) {
+          loadStatsBuilder_ = new com.google.protobuf.SingleFieldBuilder<
+              org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.Builder, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStatsOrBuilder>(
+                  loadStats_,
+                  getParentForChildren(),
+                  isClean());
+          loadStats_ = null;
+        }
+        return loadStatsBuilder_;
+      }
+
       // @@protoc_insertion_point(builder_scope:ResultOrException)
     }
 
@@ -31067,6 +31804,11 @@ public final class ClientProtos {
     com.google.protobuf.GeneratedMessage.FieldAccessorTable
       internal_static_RegionAction_fieldAccessorTable;
   private static com.google.protobuf.Descriptors.Descriptor
+    internal_static_RegionLoadStats_descriptor;
+  private static
+    com.google.protobuf.GeneratedMessage.FieldAccessorTable
+      internal_static_RegionLoadStats_fieldAccessorTable;
+  private static com.google.protobuf.Descriptors.Descriptor
     internal_static_ResultOrException_descriptor;
   private static
     com.google.protobuf.GeneratedMessage.FieldAccessorTable
@@ -31180,31 +31922,33 @@ public final class ClientProtos {
       "\030\003 \001(\0132\004.Get\022-\n\014service_call\030\004 \001(\0132\027.Cop" +
       "rocessorServiceCall\"Y\n\014RegionAction\022 \n\006r" +
       "egion\030\001 \002(\0132\020.RegionSpecifier\022\016\n\006atomic\030" +
-      "\002 \001(\010\022\027\n\006action\030\003 \003(\0132\007.Action\"\221\001\n\021Resul" +
-      "tOrException\022\r\n\005index\030\001 \001(\r\022\027\n\006result\030\002 " +
-      "\001(\0132\007.Result\022!\n\texception\030\003 \001(\0132\016.NameBy" +
-      "tesPair\0221\n\016service_result\030\004 \001(\0132\031.Coproc" +
-      "essorServiceResult\"f\n\022RegionActionResult",
-      "\022-\n\021resultOrException\030\001 \003(\0132\022.ResultOrEx" +
-      "ception\022!\n\texception\030\002 \001(\0132\016.NameBytesPa" +
-      "ir\"f\n\014MultiRequest\022#\n\014regionAction\030\001 \003(\013" +
-      "2\r.RegionAction\022\022\n\nnonceGroup\030\002 \001(\004\022\035\n\tc" +
-      "ondition\030\003 \001(\0132\n.Condition\"S\n\rMultiRespo" +
-      "nse\022/\n\022regionActionResult\030\001 \003(\0132\023.Region" +
-      "ActionResult\022\021\n\tprocessed\030\002 \001(\010*\'\n\013Consi" +
-      "stency\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE\020\0012\205\003\n\rCli" +
-      "entService\022 \n\003Get\022\013.GetRequest\032\014.GetResp" +
-      "onse\022)\n\006Mutate\022\016.MutateRequest\032\017.MutateR",
-      "esponse\022#\n\004Scan\022\014.ScanRequest\032\r.ScanResp" +
-      "onse\022>\n\rBulkLoadHFile\022\025.BulkLoadHFileReq" +
-      "uest\032\026.BulkLoadHFileResponse\022F\n\013ExecServ" +
-      "ice\022\032.CoprocessorServiceRequest\032\033.Coproc" +
-      "essorServiceResponse\022R\n\027ExecRegionServer" +
-      "Service\022\032.CoprocessorServiceRequest\032\033.Co" +
-      "processorServiceResponse\022&\n\005Multi\022\r.Mult" +
-      "iRequest\032\016.MultiResponseBB\n*org.apache.h" +
-      "adoop.hbase.protobuf.generatedB\014ClientPr" +
-      "otosH\001\210\001\001\240\001\001"
+      "\002 \001(\010\022\027\n\006action\030\003 \003(\0132\007.Action\"*\n\017Region" +
+      "LoadStats\022\027\n\014memstoreLoad\030\001 \001(\005:\0010\"\266\001\n\021R" +
+      "esultOrException\022\r\n\005index\030\001 \001(\r\022\027\n\006resul" +
+      "t\030\002 \001(\0132\007.Result\022!\n\texception\030\003 \001(\0132\016.Na" +
+      "meBytesPair\0221\n\016service_result\030\004 \001(\0132\031.Co",
+      "processorServiceResult\022#\n\tloadStats\030\005 \001(" +
+      "\0132\020.RegionLoadStats\"f\n\022RegionActionResul" +
+      "t\022-\n\021resultOrException\030\001 \003(\0132\022.ResultOrE" +
+      "xception\022!\n\texception\030\002 \001(\0132\016.NameBytesP" +
+      "air\"f\n\014MultiRequest\022#\n\014regionAction\030\001 \003(" +
+      "\0132\r.RegionAction\022\022\n\nnonceGroup\030\002 \001(\004\022\035\n\t" +
+      "condition\030\003 \001(\0132\n.Condition\"S\n\rMultiResp" +
+      "onse\022/\n\022regionActionResult\030\001 \003(\0132\023.Regio" +
+      "nActionResult\022\021\n\tprocessed\030\002 \001(\010*\'\n\013Cons" +
+      "istency\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE\020\0012\205\003\n\rCl",
+      "ientService\022 \n\003Get\022\013.GetRequest\032\014.GetRes" +
+      "ponse\022)\n\006Mutate\022\016.MutateRequest\032\017.Mutate" +
+      "Response\022#\n\004Scan\022\014.ScanRequest\032\r.ScanRes" +
+      "ponse\022>\n\rBulkLoadHFile\022\025.BulkLoadHFileRe" +
+      "quest\032\026.BulkLoadHFileResponse\022F\n\013ExecSer" +
+      "vice\022\032.CoprocessorServiceRequest\032\033.Copro" +
+      "cessorServiceResponse\022R\n\027ExecRegionServe" +
+      "rService\022\032.CoprocessorServiceRequest\032\033.C" +
+      "oprocessorServiceResponse\022&\n\005Multi\022\r.Mul" +
+      "tiRequest\032\016.MultiResponseBB\n*org.apache.",
+      "hadoop.hbase.protobuf.generatedB\014ClientP" +
+      "rotosH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -31361,26 +32105,32 @@ public final class ClientProtos {
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_RegionAction_descriptor,
               new java.lang.String[] { "Region", "Atomic", "Action", });
-          internal_static_ResultOrException_descriptor =
+          internal_static_RegionLoadStats_descriptor =
             getDescriptor().getMessageTypes().get(22);
+          internal_static_RegionLoadStats_fieldAccessorTable = new
+            com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+              internal_static_RegionLoadStats_descriptor,
+              new java.lang.String[] { "MemstoreLoad", });
+          internal_static_ResultOrException_descriptor =
+            getDescriptor().getMessageTypes().get(23);
           internal_static_ResultOrException_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_ResultOrException_descriptor,
-              new java.lang.String[] { "Index", "Result", "Exception", "ServiceResult", });
+              new java.lang.String[] { "Index", "Result", "Exception", "ServiceResult", "LoadStats", });
           internal_static_RegionActionResult_descriptor =
-            getDescriptor().getMessageTypes().get(23);
+            getDescriptor().getMessageTypes().get(24);
           internal_static_RegionActionResult_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_RegionActionResult_descriptor,
               new java.lang.String[] { "ResultOrException", "Exception", });
           internal_static_MultiRequest_descriptor =
-            getDescriptor().getMessageTypes().get(24);
+            getDescriptor().getMessageTypes().get(25);
           internal_static_MultiRequest_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_MultiRequest_descriptor,
               new java.lang.String[] { "RegionAction", "NonceGroup", "Condition", });
           internal_static_MultiResponse_descriptor =
-            getDescriptor().getMessageTypes().get(25);
+            getDescriptor().getMessageTypes().get(26);
           internal_static_MultiResponse_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_MultiResponse_descriptor,

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-protocol/src/main/protobuf/Client.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/Client.proto b/hbase-protocol/src/main/protobuf/Client.proto
index ede1c26..1a3c43e 100644
--- a/hbase-protocol/src/main/protobuf/Client.proto
+++ b/hbase-protocol/src/main/protobuf/Client.proto
@@ -353,6 +353,14 @@ message RegionAction {
   repeated Action action = 3;
 }
 
+/*
+* Statistics about the current load on the region
+*/
+message RegionLoadStats{
+  // percent load on the memstore. Guaranteed to be positive, between 0 and 100
+  optional int32 memstoreLoad = 1 [default = 0];
+}
+
 /**
  * Either a Result or an Exception NameBytesPair (keyed by
  * exception name whose value is the exception stringified)
@@ -366,6 +374,8 @@ message ResultOrException {
   optional NameBytesPair exception = 3;
   // result if this was a coprocessor service call
   optional CoprocessorServiceResult service_result = 4;
+  // current load on the region
+  optional RegionLoadStats loadStats = 5;
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index b4b6adc..31be7ad 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -690,7 +690,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
     try {
       List<LoadQueueItem> toRetry = new ArrayList<LoadQueueItem>();
       Configuration conf = getConf();
-      boolean success = RpcRetryingCallerFactory.instantiate(conf).<Boolean> newCaller()
+      boolean success = RpcRetryingCallerFactory.instantiate(conf,
+          null).<Boolean> newCaller()
           .callWithRetries(svrCallable, Integer.MAX_VALUE);
       if (!success) {
         LOG.warn("Attempt to bulk load region containing "

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 95ba337..428e857 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -82,6 +82,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.RegionTooBusyException;
 import org.apache.hadoop.hbase.TableName;
@@ -122,6 +123,7 @@ import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
@@ -544,6 +546,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
   private final MetricsRegion metricsRegion;
   private final MetricsRegionWrapperImpl metricsRegionWrapper;
   private final Durability durability;
+  private final boolean regionStatsEnabled;
 
   /**
    * HRegion constructor. This constructor should only be used for testing and
@@ -685,6 +688,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
         conf.getBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING,
           HConstants.DEFAULT_DISALLOW_WRITES_IN_RECOVERING_CONFIG);
     configurationManager = Optional.absent();
+
+    // disable stats tracking system tables, but check the config for everything else
+    this.regionStatsEnabled = htd.getTableName().getNamespaceAsString().equals(
+        NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR) ?
+          false :
+          conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE,
+              HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE);
   }
 
   void setHTableSpecificConf() {
@@ -5187,18 +5197,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
     return results;
   }
 
-  public void mutateRow(RowMutations rm) throws IOException {
+  public ClientProtos.RegionLoadStats mutateRow(RowMutations rm) throws IOException {
     // Don't need nonces here - RowMutations only supports puts and deletes
-    mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow()));
+    return mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow()));
   }
 
   /**
    * Perform atomic mutations within the region w/o nonces.
    * See {@link #mutateRowsWithLocks(Collection, Collection, long, long)}
    */
-  public void mutateRowsWithLocks(Collection<Mutation> mutations,
+  public ClientProtos.RegionLoadStats mutateRowsWithLocks(Collection<Mutation> mutations,
       Collection<byte[]> rowsToLock) throws IOException {
-    mutateRowsWithLocks(mutations, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE);
+    return mutateRowsWithLocks(mutations, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE);
   }
 
   /**
@@ -5213,10 +5223,24 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
    * <code>rowsToLock</code> is sorted in order to avoid deadlocks.
    * @throws IOException
    */
-  public void mutateRowsWithLocks(Collection<Mutation> mutations,
+  public ClientProtos.RegionLoadStats mutateRowsWithLocks(Collection<Mutation> mutations,
       Collection<byte[]> rowsToLock, long nonceGroup, long nonce) throws IOException {
     MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock);
     processRowsWithLocks(proc, -1, nonceGroup, nonce);
+    return getRegionStats();
+  }
+
+  /**
+   * @return the current load statistics for the the region
+   */
+  public ClientProtos.RegionLoadStats getRegionStats() {
+    if (!regionStatsEnabled) {
+      return null;
+    }
+    ClientProtos.RegionLoadStats.Builder stats = ClientProtos.RegionLoadStats.newBuilder();
+    stats.setMemstoreLoad((int) (Math.min(100, (this.memstoreSize.get() * 100) / this
+        .memstoreFlushSize)));
+    return stats.build();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 97c6987..32d59d4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -257,8 +257,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
   }
 
   private static ResultOrException getResultOrException(
-      final ClientProtos.Result r, final int index) {
-    return getResultOrException(ResponseConverter.buildActionResult(r), index);
+      final ClientProtos.Result r, final int index, final ClientProtos.RegionLoadStats stats) {
+    return getResultOrException(ResponseConverter.buildActionResult(r, stats), index);
   }
 
   private static ResultOrException getResultOrException(final Exception e, final int index) {
@@ -355,7 +355,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
    * @param cellScanner if non-null, the mutation data -- the Cell content.
    * @throws IOException
    */
-  private void mutateRows(final HRegion region, final List<ClientProtos.Action> actions,
+  private ClientProtos.RegionLoadStats mutateRows(final HRegion region,
+      final List<ClientProtos.Action> actions,
       final CellScanner cellScanner) throws IOException {
     if (!region.getRegionInfo().isMetaTable()) {
       regionServer.cacheFlusher.reclaimMemStoreMemory();
@@ -381,7 +382,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
           throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
       }
     }
-    region.mutateRow(rm);
+    return region.mutateRow(rm);
   }
 
   /**
@@ -661,7 +662,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
 
           case SUCCESS:
             builder.addResultOrException(getResultOrException(
-              ClientProtos.Result.getDefaultInstance(), index));
+              ClientProtos.Result.getDefaultInstance(), index, region.getRegionStats()));
             break;
         }
       }
@@ -1815,6 +1816,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
             processed = checkAndRowMutate(region, regionAction.getActionList(),
                   cellScanner, row, family, qualifier, compareOp, comparator);
           } else {
+            ClientProtos.RegionLoadStats stats = mutateRows(region, regionAction.getActionList(),
+                cellScanner);
+            // add the stats to the request
+            if(stats != null) {
+              responseBuilder.addRegionActionResult(RegionActionResult.newBuilder()
+                  .addResultOrException(ResultOrException.newBuilder().setLoadStats(stats)));
+            }
             mutateRows(region, regionAction.getActionList(), cellScanner);
             processed = Boolean.TRUE;
           }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
index ff5f2f5..59a1b43 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
@@ -162,7 +162,7 @@ public class WALEditsReplaySink {
   private void replayEdits(final HRegionLocation regionLoc, final HRegionInfo regionInfo,
       final List<Entry> entries) throws IOException {
     try {
-      RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(conf);
+      RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(conf, null);
       ReplayServerCallable<ReplicateWALEntryResponse> callable =
           new ReplayServerCallable<ReplicateWALEntryResponse>(this.conn, this.tableName, regionLoc,
               regionInfo, entries);

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
index a99b047..998cdf0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
@@ -134,7 +134,7 @@ public class HConnectionTestingUtility {
     Mockito.doNothing().when(c).decCount();
     Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn(
         RpcRetryingCallerFactory.instantiate(conf,
-            RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR));
+            RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null));
     HTableInterface t = Mockito.mock(HTableInterface.class);
     Mockito.when(c.getTable((TableName)Mockito.any())).thenReturn(t);
     ResultScanner rs = Mockito.mock(ResultScanner.class);

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
new file mode 100644
index 0000000..dfb9a70
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Test that we can actually send and use region metrics to slowdown client writes
+ */
+@Category(MediumTests.class)
+public class TestClientPushback {
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  private static final byte[] tableName = Bytes.toBytes("client-pushback");
+  private static final byte[] family = Bytes.toBytes("f");
+  private static final byte[] qualifier = Bytes.toBytes("q");
+  private static long flushSizeBytes = 1024;
+
+  @BeforeClass
+  public static void setupCluster() throws Exception{
+    Configuration conf = UTIL.getConfiguration();
+    // enable backpressure
+    conf.setBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, true);
+    // turn the memstore size way down so we don't need to write a lot to see changes in memstore
+    // load
+    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
+        flushSizeBytes);
+    // ensure we block the flushes when we are double that flushsize
+    conf.setLong("hbase.hregion.memstore.block.multiplier", 2);
+
+    UTIL.startMiniCluster();
+    UTIL.createTable(tableName, family);
+  }
+
+  @AfterClass
+  public static void teardownCluster() throws Exception{
+    UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testClientTrackesServerPushback() throws Exception{
+    Configuration conf = UTIL.getConfiguration();
+    TableName tablename = TableName.valueOf(tableName);
+    HTable table = new HTable(conf, tablename);
+    //make sure we flush after each put
+    table.setAutoFlushTo(true);
+
+    // write some data
+    Put p = new Put(Bytes.toBytes("row"));
+    p.add(family, qualifier, Bytes.toBytes("value1"));
+    table.put(p);
+
+    // get the stats for the region hosting our table
+    ClusterConnection conn = ConnectionManager.getConnectionInternal(conf);
+    ServerStatisticTracker stats = conn.getStatisticsTracker();
+    assertNotNull( "No stats configured for the client!", stats);
+    // get the names so we can query the stats
+    ServerName server = UTIL.getHBaseCluster().getRegionServer(0).getServerName();
+    byte[] regionName = UTIL.getHBaseCluster().getRegionServer(0).getOnlineRegions(tablename).get
+        (0).getRegionName();
+
+    // check to see we found some load on the memstore
+    ServerStatistics serverStats = stats.getServerStatsForTesting(server);
+    ServerStatistics.RegionStatistics regionStats = serverStats.getStatsForRegion(regionName);
+    assertEquals(15, regionStats.getMemstoreLoadPercent());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
index 837e37b..3d1b1c8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
@@ -76,7 +76,7 @@ public class TestReplicasClient {
   private static final Log LOG = LogFactory.getLog(TestReplicasClient.class);
 
   static {
-    ((Log4JLogger)RpcRetryingCaller.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)RpcRetryingCallerImpl.LOG).getLogger().setLevel(Level.ALL);
   }
 
   private static final int NB_SERVERS = 1;

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
index f5380ef..7ca12f0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.client.RpcRetryingCaller;
+import org.apache.hadoop.hbase.client.RpcRetryingCallerImpl;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
 import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -74,7 +75,7 @@ public class TestRegionReplicaReplicationEndpoint {
   private static final Log LOG = LogFactory.getLog(TestRegionReplicaReplicationEndpoint.class);
 
   static {
-    ((Log4JLogger)RpcRetryingCaller.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger) RpcRetryingCallerImpl.LOG).getLogger().setLevel(Level.ALL);
   }
 
   private static final int NB_SERVERS = 2;


[2/2] hbase git commit: HBASE-5162 Basic client pushback mechanism

Posted by jy...@apache.org.
HBASE-5162 Basic client pushback mechanism

Instead of just blocking the client for 90 seconds when the region gets too
busy, it now sends along region load stats to the client so the client can
know how busy the server is. Currently, its just the load on the memstore, but
it can be extended for other stats (e.g. cpu, general memory, etc.).

It is then up to the client to decide if it wants to listen to these stats.
By default, the client ignores the stats, but it can easily be toggled to the
built-in exponential back-off or users can plug in their own back-off
implementations


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a411227b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a411227b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a411227b

Branch: refs/heads/master
Commit: a411227b0ebf78b4ee8ae7179e162b54734e77de
Parents: e5d813c
Author: Jesse Yates <je...@gmail.com>
Authored: Tue Oct 28 16:14:16 2014 -0700
Committer: Jesse Yates <jy...@apache.org>
Committed: Tue Dec 16 11:14:30 2014 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/client/AsyncProcess.java       | 120 ++-
 .../hadoop/hbase/client/ClusterConnection.java  |  12 +-
 .../hadoop/hbase/client/ConnectionAdapter.java  |  11 +
 .../hadoop/hbase/client/ConnectionManager.java  |  26 +-
 .../hadoop/hbase/client/DelayingRunner.java     | 116 +++
 .../org/apache/hadoop/hbase/client/HTable.java  |   5 +-
 .../apache/hadoop/hbase/client/MultiAction.java |  17 +-
 .../org/apache/hadoop/hbase/client/Result.java  |  20 +-
 .../hadoop/hbase/client/ResultStatsUtil.java    |  76 ++
 .../hadoop/hbase/client/RpcRetryingCaller.java  | 217 +----
 .../hbase/client/RpcRetryingCallerFactory.java  |  42 +-
 .../hbase/client/RpcRetryingCallerImpl.java     | 238 ++++++
 .../hbase/client/ServerStatisticTracker.java    |  74 ++
 .../client/StatsTrackingRpcRetryingCaller.java  |  77 ++
 .../client/backoff/ClientBackoffPolicy.java     |  42 +
 .../backoff/ClientBackoffPolicyFactory.java     |  59 ++
 .../backoff/ExponentialClientBackoffPolicy.java |  71 ++
 .../hbase/client/backoff/ServerStatistics.java  |  68 ++
 .../hbase/ipc/RegionCoprocessorRpcChannel.java  |   2 +-
 .../hbase/protobuf/ResponseConverter.java       |  16 +-
 .../hadoop/hbase/client/TestAsyncProcess.java   |   6 +-
 .../client/TestClientExponentialBackoff.java    | 110 +++
 .../client/TestFastFailWithoutTestUtil.java     |   2 +-
 .../org/apache/hadoop/hbase/HConstants.java     |   6 +
 .../hbase/protobuf/generated/ClientProtos.java  | 810 ++++++++++++++++++-
 hbase-protocol/src/main/protobuf/Client.proto   |  10 +
 .../hbase/mapreduce/LoadIncrementalHFiles.java  |   3 +-
 .../hadoop/hbase/regionserver/HRegion.java      |  34 +-
 .../hbase/regionserver/RSRpcServices.java       |  18 +-
 .../regionserver/wal/WALEditsReplaySink.java    |   2 +-
 .../hbase/client/HConnectionTestingUtility.java |   2 +-
 .../hadoop/hbase/client/TestClientPushback.java |  94 +++
 .../hadoop/hbase/client/TestReplicasClient.java |   2 +-
 .../TestRegionReplicaReplicationEndpoint.java   |   3 +-
 34 files changed, 2110 insertions(+), 301 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index e0c14a6..8b1db8f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -48,6 +49,7 @@ import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -313,7 +315,8 @@ class AsyncProcess {
    * Uses default ExecutorService for this AP (must have been created with one).
    */
   public <CResult> AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows,
-      boolean atLeastOne, Batch.Callback<CResult> callback, boolean needResults) throws InterruptedIOException {
+      boolean atLeastOne, Batch.Callback<CResult> callback, boolean needResults)
+      throws InterruptedIOException {
     return submit(null, tableName, rows, atLeastOne, callback, needResults);
   }
 
@@ -374,7 +377,7 @@ class AsyncProcess {
           locationErrors = new ArrayList<Exception>();
           locationErrorRows = new ArrayList<Integer>();
           LOG.error("Failed to get region location ", ex);
-          // This action failed before creating ars. Add it to retained but do not add to submit list.
+          // This action failed before creating ars. Retain it, but do not add to submit list.
           // We will then add it to ars in an already-failed state.
           retainedActions.add(new Action<Row>(r, ++posInList));
           locationErrors.add(ex);
@@ -918,14 +921,12 @@ class AsyncProcess {
       return loc;
     }
 
-
-
     /**
      * Send a multi action structure to the servers, after a delay depending on the attempt
      * number. Asynchronous.
      *
      * @param actionsByServer the actions structured by regions
-     * @param numAttempt      the attempt number.
+     * @param numAttempt the attempt number.
      * @param actionsForReplicaThread original actions for replica thread; null on non-first call.
      */
     private void sendMultiAction(Map<ServerName, MultiAction<Row>> actionsByServer,
@@ -935,33 +936,98 @@ class AsyncProcess {
       int actionsRemaining = actionsByServer.size();
       // This iteration is by server (the HRegionLocation comparator is by server portion only).
       for (Map.Entry<ServerName, MultiAction<Row>> e : actionsByServer.entrySet()) {
-        final ServerName server = e.getKey();
-        final MultiAction<Row> multiAction = e.getValue();
+        ServerName server = e.getKey();
+        MultiAction<Row> multiAction = e.getValue();
         incTaskCounters(multiAction.getRegions(), server);
-        Runnable runnable = Trace.wrap("AsyncProcess.sendMultiAction",
-            new SingleServerRequestRunnable(multiAction, numAttempt, server));
-        if ((--actionsRemaining == 0) && reuseThread) {
-          runnable.run();
-        } else {
-          try {
-            pool.submit(runnable);
-          } catch (RejectedExecutionException ree) {
-            // This should never happen. But as the pool is provided by the end user, let's secure
-            //  this a little.
-            decTaskCounters(multiAction.getRegions(), server);
-            LOG.warn("#" + id + ", the task was rejected by the pool. This is unexpected." +
-                " Server is " + server.getServerName(), ree);
-            // We're likely to fail again, but this will increment the attempt counter, so it will
-            //  finish.
-            receiveGlobalFailure(multiAction, server, numAttempt, ree);
+        Collection<? extends Runnable> runnables = getNewMultiActionRunnable(server, multiAction,
+            numAttempt);
+        // make sure we correctly count the number of runnables before we try to reuse the send
+        // thread, in case we had to split the request into different runnables because of backoff
+        if (runnables.size() > actionsRemaining) {
+          actionsRemaining = runnables.size();
+        }
+
+        // run all the runnables
+        for (Runnable runnable : runnables) {
+          if ((--actionsRemaining == 0) && reuseThread) {
+            runnable.run();
+          } else {
+            try {
+              pool.submit(runnable);
+            } catch (RejectedExecutionException ree) {
+              // This should never happen. But as the pool is provided by the end user, let's secure
+              //  this a little.
+              decTaskCounters(multiAction.getRegions(), server);
+              LOG.warn("#" + id + ", the task was rejected by the pool. This is unexpected." +
+                  " Server is " + server.getServerName(), ree);
+              // We're likely to fail again, but this will increment the attempt counter, so it will
+              //  finish.
+              receiveGlobalFailure(multiAction, server, numAttempt, ree);
+            }
           }
         }
       }
+
       if (actionsForReplicaThread != null) {
         startWaitingForReplicaCalls(actionsForReplicaThread);
       }
     }
 
+    private Collection<? extends Runnable> getNewMultiActionRunnable(ServerName server,
+        MultiAction<Row> multiAction,
+        int numAttempt) {
+      // no stats to manage, just do the standard action
+      if (AsyncProcess.this.connection.getStatisticsTracker() == null) {
+        return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction",
+            new SingleServerRequestRunnable(multiAction, numAttempt, server)));
+      }
+
+      // group the actions by the amount of delay
+      Map<Long, DelayingRunner> actions = new HashMap<Long, DelayingRunner>(multiAction
+          .size());
+
+      // split up the actions
+      for (Map.Entry<byte[], List<Action<Row>>> e : multiAction.actions.entrySet()) {
+        Long backoff = getBackoff(server, e.getKey());
+        DelayingRunner runner = actions.get(backoff);
+        if (runner == null) {
+          actions.put(backoff, new DelayingRunner(backoff, e));
+        } else {
+          runner.add(e);
+        }
+      }
+
+      List<Runnable> toReturn = new ArrayList<Runnable>(actions.size());
+      for (DelayingRunner runner : actions.values()) {
+        String traceText = "AsyncProcess.sendMultiAction";
+        Runnable runnable =
+            new SingleServerRequestRunnable(runner.getActions(), numAttempt, server);
+        // use a delay runner only if we need to sleep for some time
+        if (runner.getSleepTime() > 0) {
+          runner.setRunner(runnable);
+          traceText = "AsyncProcess.clientBackoff.sendMultiAction";
+          runnable = runner;
+        }
+        runnable = Trace.wrap(traceText, runnable);
+        toReturn.add(runnable);
+
+      }
+      return toReturn;
+    }
+
+    /**
+     * @param server server location where the target region is hosted
+     * @param regionName name of the region which we are going to write some data
+     * @return the amount of time the client should wait until it submit a request to the
+     * specified server and region
+     */
+    private Long getBackoff(ServerName server, byte[] regionName) {
+      ServerStatisticTracker tracker = AsyncProcess.this.connection.getStatisticsTracker();
+      ServerStatistics stats = tracker.getStats(server);
+      return AsyncProcess.this.connection.getBackoffPolicy()
+          .getBackoffTime(server, regionName, stats);
+    }
+
     /**
      * Starts waiting to issue replica calls on a different thread; or issues them immediately.
      */
@@ -1169,6 +1235,13 @@ class AsyncProcess {
               ++failed;
             }
           } else {
+            // update the stats about the region, if its a user table. We don't want to slow down
+            // updates to meta tables, especially from internal updates (master, etc).
+            if (AsyncProcess.this.connection.getStatisticsTracker() != null) {
+              result = ResultStatsUtil.updateStats(result,
+                  AsyncProcess.this.connection.getStatisticsTracker(), server, regionName);
+            }
+
             if (callback != null) {
               try {
                 //noinspection unchecked
@@ -1498,7 +1571,6 @@ class AsyncProcess {
     }
   }
 
-
   @VisibleForTesting
   /** Create AsyncRequestFuture. Isolated to be easily overridden in the tests. */
   protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
index 8989725..45b99eb 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService;
@@ -288,5 +289,14 @@ public interface ClusterConnection extends HConnection {
    * @return true if this is a managed connection.
    */
   boolean isManaged();
-}
 
+  /**
+   * @return the current statistics tracker associated with this connection
+   */
+  ServerStatisticTracker getStatisticsTracker();
+
+  /**
+   * @return the configured client backoff policy
+   */
+  ClientBackoffPolicy getBackoffPolicy();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java
index 0011328..53c1271 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
 import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
@@ -447,4 +448,14 @@ abstract class ConnectionAdapter implements ClusterConnection {
   public boolean isManaged() {
     return wrappedConnection.isManaged();
   }
+
+  @Override
+  public ServerStatisticTracker getStatisticsTracker() {
+    return wrappedConnection.getStatisticsTracker();
+  }
+
+  @Override
+  public ClientBackoffPolicy getBackoffPolicy() {
+    return wrappedConnection.getBackoffPolicy();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
index f813ebd..acb64c8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
@@ -64,6 +64,8 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
+import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
+import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
 import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
@@ -541,6 +543,8 @@ class ConnectionManager {
     final int rpcTimeout;
     private NonceGenerator nonceGenerator = null;
     private final AsyncProcess asyncProcess;
+    // single tracker per connection
+    private final ServerStatisticTracker stats;
 
     private volatile boolean closed;
     private volatile boolean aborted;
@@ -596,6 +600,8 @@ class ConnectionManager {
      */
      Registry registry;
 
+    private final ClientBackoffPolicy backoffPolicy;
+
      HConnectionImplementation(Configuration conf, boolean managed) throws IOException {
        this(conf, managed, null, null);
      }
@@ -670,9 +676,11 @@ class ConnectionManager {
       } else {
         this.nonceGenerator = new NoNonceGenerator();
       }
+      stats = ServerStatisticTracker.create(conf);
       this.asyncProcess = createAsyncProcess(this.conf);
       this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build();
-      this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor);
+      this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats);
+      this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
     }
 
     @Override
@@ -2207,7 +2215,8 @@ class ConnectionManager {
     protected AsyncProcess createAsyncProcess(Configuration conf) {
       // No default pool available.
       return new AsyncProcess(this, conf, this.batchPool,
-          RpcRetryingCallerFactory.instantiate(conf), false, RpcControllerFactory.instantiate(conf));
+          RpcRetryingCallerFactory.instantiate(conf, this.getStatisticsTracker()), false,
+          RpcControllerFactory.instantiate(conf));
     }
 
     @Override
@@ -2215,6 +2224,16 @@ class ConnectionManager {
       return asyncProcess;
     }
 
+    @Override
+    public ServerStatisticTracker getStatisticsTracker() {
+      return this.stats;
+    }
+
+    @Override
+    public ClientBackoffPolicy getBackoffPolicy() {
+      return this.backoffPolicy;
+    }
+
     /*
      * Return the number of cached region for a table. It will only be called
      * from a unit test.
@@ -2506,7 +2525,8 @@ class ConnectionManager {
 
     @Override
     public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) {
-      return RpcRetryingCallerFactory.instantiate(conf, this.interceptor);
+      return RpcRetryingCallerFactory
+          .instantiate(conf, this.interceptor, this.getStatisticsTracker());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelayingRunner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelayingRunner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelayingRunner.java
new file mode 100644
index 0000000..83c73b6
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelayingRunner.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A wrapper for a runnable for a group of actions for a single regionserver.
+ * <p>
+ * This can be used to build up the actions that should be taken and then
+ * </p>
+ * <p>
+ * This class exists to simulate using a ScheduledExecutorService with just a regular
+ * ExecutorService and Runnables. It is used for legacy reasons in the the client; this could
+ * only be removed if we change the expectations in HTable around the pool the client is able to
+ * pass in and even if we deprecate the current APIs would require keeping this class around
+ * for the interim to bridge between the legacy ExecutorServices and the scheduled pool.
+ * </p>
+ */
+@InterfaceAudience.Private
+public class DelayingRunner<T> implements Runnable {
+  private static final Log LOG = LogFactory.getLog(DelayingRunner.class);
+
+  private final Object sleepLock = new Object();
+  private boolean triggerWake = false;
+  private long sleepTime;
+  private MultiAction<T> actions = new MultiAction<T>();
+  private Runnable runnable;
+
+  public DelayingRunner(long sleepTime, Map.Entry<byte[], List<Action<T>>> e) {
+    this.sleepTime = sleepTime;
+    add(e);
+  }
+
+  public void setRunner(Runnable runner) {
+    this.runnable = runner;
+  }
+
+  @Override
+  public void run() {
+    if (!sleep()) {
+      LOG.warn(
+          "Interrupted while sleeping for expected sleep time " + sleepTime + " ms");
+    }
+    //TODO maybe we should consider switching to a listenableFuture for the actual callable and
+    // then handling the results/errors as callbacks. That way we can decrement outstanding tasks
+    // even if we get interrupted here, but for now, we still need to run so we decrement the
+    // outstanding tasks
+    this.runnable.run();
+  }
+
+  /**
+   * Sleep for an expected amount of time.
+   * <p>
+   * This is nearly a copy of what the Sleeper does, but with the ability to know if you
+   * got interrupted while sleeping.
+   * </p>
+   *
+   * @return <tt>true</tt> if the sleep completely entirely successfully,
+   * but otherwise <tt>false</tt> if the sleep was interrupted.
+   */
+  private boolean sleep() {
+    long now = EnvironmentEdgeManager.currentTime();
+    long startTime = now;
+    long waitTime = sleepTime;
+    while (waitTime > 0) {
+      long woke = -1;
+      try {
+        synchronized (sleepLock) {
+          if (triggerWake) break;
+          sleepLock.wait(waitTime);
+        }
+        woke = EnvironmentEdgeManager.currentTime();
+      } catch (InterruptedException iex) {
+        return false;
+      }
+      // Recalculate waitTime.
+      woke = (woke == -1) ? EnvironmentEdgeManager.currentTime() : woke;
+      waitTime = waitTime - (woke - startTime);
+    }
+    return true;
+  }
+
+  public void add(Map.Entry<byte[], List<Action<T>>> e) {
+    actions.add(e.getKey(), e.getValue());
+  }
+
+  public MultiAction<T> getActions() {
+    return actions;
+  }
+
+  public long getSleepTime() {
+    return sleepTime;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index fd0470a..0508fce 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -1887,8 +1887,9 @@ public class HTable implements HTableInterface, RegionLocator {
 
     AsyncProcess asyncProcess =
         new AsyncProcess(connection, configuration, pool,
-            RpcRetryingCallerFactory.instantiate(configuration), true,
-            RpcControllerFactory.instantiate(configuration));
+            RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()),
+            true, RpcControllerFactory.instantiate(configuration));
+
     AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs,
         new Callback<ClientProtos.CoprocessorServiceResult>() {
           @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
index 8f1dc4d..16ab852 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hbase.client;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -68,12 +69,24 @@ public final class MultiAction<R> {
    * @param a
    */
   public void add(byte[] regionName, Action<R> a) {
+    add(regionName, Arrays.asList(a));
+  }
+
+  /**
+   * Add an Action to this container based on it's regionName. If the regionName
+   * is wrong, the initial execution will fail, but will be automatically
+   * retried after looking up the correct region.
+   *
+   * @param regionName
+   * @param actionList list of actions to add for the region
+   */
+  public void add(byte[] regionName, List<Action<R>> actionList){
     List<Action<R>> rsActions = actions.get(regionName);
     if (rsActions == null) {
-      rsActions = new ArrayList<Action<R>>();
+      rsActions = new ArrayList<Action<R>>(actionList.size());
       actions.put(regionName, rsActions);
     }
-    rsActions.add(a);
+    rsActions.addAll(actionList);
   }
 
   public void setNonceGroup(long nonceGroup) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
index 8303121..08d9b80 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
@@ -94,6 +95,7 @@ public class Result implements CellScannable, CellScanner {
    * Index for where we are when Result is acting as a {@link CellScanner}.
    */
   private int cellScannerIndex = INITIAL_CELLSCANNER_INDEX;
+  private ClientProtos.RegionLoadStats stats;
 
   /**
    * Creates an empty Result w/ no KeyValue payload; returns null if you call {@link #rawCells()}.
@@ -794,4 +796,20 @@ public class Result implements CellScannable, CellScanner {
   public boolean isStale() {
     return stale;
   }
-}
+
+  /**
+   * Add load information about the region to the information about the result
+   * @param loadStats statistics about the current region from which this was returned
+   */
+  public void addResults(ClientProtos.RegionLoadStats loadStats) {
+    this.stats = loadStats;
+  }
+
+  /**
+   * @return the associated statistics about the region from which this was returned. Can be
+   * <tt>null</tt> if stats are disabled.
+   */
+  public ClientProtos.RegionLoadStats getStats() {
+    return stats;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java
new file mode 100644
index 0000000..3caa63e
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+
+/**
+ * A {@link Result} with some statistics about the server/region status
+ */
+@InterfaceAudience.Private
+public final class ResultStatsUtil {
+
+  private ResultStatsUtil() {
+    //private ctor for util class
+  }
+
+  /**
+   * Update the stats for the specified region if the result is an instance of {@link
+   * ResultStatsUtil}
+   *
+   * @param r object that contains the result and possibly the statistics about the region
+   * @param serverStats stats tracker to update from the result
+   * @param server server from which the result was obtained
+   * @param regionName full region name for the stats.
+   * @return the underlying {@link Result} if the passed result is an {@link
+   * ResultStatsUtil} or just returns the result;
+   */
+  public static <T> T updateStats(T r, ServerStatisticTracker serverStats,
+      ServerName server, byte[] regionName) {
+    if (!(r instanceof Result)) {
+      return r;
+    }
+    Result result = (Result) r;
+    // early exit if there are no stats to collect
+    ClientProtos.RegionLoadStats stats = result.getStats();
+    if(stats == null){
+      return r;
+    }
+
+    if (regionName != null) {
+      serverStats.updateRegionStats(server, regionName, stats);
+    }
+
+    return r;
+  }
+
+  public static <T> T updateStats(T r, ServerStatisticTracker stats,
+      HRegionLocation regionLocation) {
+    byte[] regionName = null;
+    ServerName server = null;
+    if (regionLocation != null) {
+      server = regionLocation.getServerName();
+      regionName = regionLocation.getRegionInfo().getRegionName();
+    }
+
+    return updateStats(r, stats, server, regionName);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
index a2c4d99..807c227 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
@@ -1,5 +1,4 @@
 /**
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,93 +15,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.hbase.client;
 
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.lang.reflect.UndeclaredThrowableException;
-import java.net.SocketTimeoutException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.ExceptionUtil;
-import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
 
-import com.google.protobuf.ServiceException;
+import java.io.IOException;
 
 /**
- * Runs an rpc'ing {@link RetryingCallable}. Sets into rpc client
- * threadlocal outstanding timeouts as so we don't persist too much.
- * Dynamic rather than static so can set the generic appropriately.
  *
- * This object has a state. It should not be used by in parallel by different threads.
- * Reusing it is possible however, even between multiple threads. However, the user will
- *  have to manage the synchronization on its side: there is no synchronization inside the class.
  */
-@InterfaceAudience.Private
-public class RpcRetryingCaller<T> {
-  public static final Log LOG = LogFactory.getLog(RpcRetryingCaller.class);
-  /**
-   * When we started making calls.
-   */
-  private long globalStartTime;
-  /**
-   * Start and end times for a single call.
-   */
-  private final static int MIN_RPC_TIMEOUT = 2000;
-  /** How many retries are allowed before we start to log */
-  private final int startLogErrorsCnt;
-
-  private final long pause;
-  private final int retries;
-  private final AtomicBoolean cancelled = new AtomicBoolean(false);
-  private final RetryingCallerInterceptor interceptor;
-  private final RetryingCallerInterceptorContext context;
-
-  public RpcRetryingCaller(long pause, int retries, int startLogErrorsCnt) {
-    this(pause, retries, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, startLogErrorsCnt);
-  }
-  
-  public RpcRetryingCaller(long pause, int retries,
-      RetryingCallerInterceptor interceptor, int startLogErrorsCnt) {
-    this.pause = pause;
-    this.retries = retries;
-    this.interceptor = interceptor;
-    context = interceptor.createEmptyContext();
-    this.startLogErrorsCnt = startLogErrorsCnt;
-  }
-
-  private int getRemainingTime(int callTimeout) {
-    if (callTimeout <= 0) {
-      return 0;
-    } else {
-      if (callTimeout == Integer.MAX_VALUE) return Integer.MAX_VALUE;
-      int remainingTime = (int) (callTimeout -
-          (EnvironmentEdgeManager.currentTime() - this.globalStartTime));
-      if (remainingTime < MIN_RPC_TIMEOUT) {
-        // If there is no time left, we're trying anyway. It's too late.
-        // 0 means no timeout, and it's not the intent here. So we secure both cases by
-        // resetting to the minimum.
-        remainingTime = MIN_RPC_TIMEOUT;
-      }
-      return remainingTime;
-    }
-  }
-  
-  public void cancel(){
-    cancelled.set(true);
-    synchronized (cancelled){
-      cancelled.notifyAll();
-    }
-  }
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface RpcRetryingCaller<T> {
+  void cancel();
 
   /**
    * Retries if invocation fails.
@@ -112,75 +38,8 @@ public class RpcRetryingCaller<T> {
    * @throws IOException if a remote or network exception occurs
    * @throws RuntimeException other unspecified error
    */
-  public T callWithRetries(RetryingCallable<T> callable, int callTimeout)
-  throws IOException, RuntimeException {
-    List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions =
-      new ArrayList<RetriesExhaustedException.ThrowableWithExtraContext>();
-    this.globalStartTime = EnvironmentEdgeManager.currentTime();
-    context.clear();
-    for (int tries = 0;; tries++) {
-      long expectedSleep;
-      try {
-        callable.prepare(tries != 0); // if called with false, check table status on ZK
-        interceptor.intercept(context.prepare(callable, tries));
-        return callable.call(getRemainingTime(callTimeout));
-      } catch (PreemptiveFastFailException e) {
-        throw e;
-      } catch (Throwable t) {
-        ExceptionUtil.rethrowIfInterrupt(t);
-        if (tries > startLogErrorsCnt) {
-          LOG.info("Call exception, tries=" + tries + ", retries=" + retries + ", started=" +
-              (EnvironmentEdgeManager.currentTime() - this.globalStartTime) + " ms ago, "
-              + "cancelled=" + cancelled.get() + ", msg="
-              + callable.getExceptionMessageAdditionalDetail());
-        }
-
-        // translateException throws exception when should not retry: i.e. when request is bad.
-        interceptor.handleFailure(context, t);
-        t = translateException(t);
-        callable.throwable(t, retries != 1);
-        RetriesExhaustedException.ThrowableWithExtraContext qt =
-            new RetriesExhaustedException.ThrowableWithExtraContext(t,
-                EnvironmentEdgeManager.currentTime(), toString());
-        exceptions.add(qt);
-        if (tries >= retries - 1) {
-          throw new RetriesExhaustedException(tries, exceptions);
-        }
-        // If the server is dead, we need to wait a little before retrying, to give
-        //  a chance to the regions to be
-        // tries hasn't been bumped up yet so we use "tries + 1" to get right pause time
-        expectedSleep = callable.sleep(pause, tries + 1);
-
-        // If, after the planned sleep, there won't be enough time left, we stop now.
-        long duration = singleCallDuration(expectedSleep);
-        if (duration > callTimeout) {
-          String msg = "callTimeout=" + callTimeout + ", callDuration=" + duration +
-              ": " + callable.getExceptionMessageAdditionalDetail();
-          throw (SocketTimeoutException)(new SocketTimeoutException(msg).initCause(t));
-        }
-      } finally {
-        interceptor.updateFailureInfo(context);
-      }
-      try {
-        if (expectedSleep > 0) {
-          synchronized (cancelled) {
-            if (cancelled.get()) return null;
-            cancelled.wait(expectedSleep);
-          }
-        }
-        if (cancelled.get()) return null;
-      } catch (InterruptedException e) {
-        throw new InterruptedIOException("Interrupted after " + tries + " tries  on " + retries);
-      }
-    }
-  }
-
-  /**
-   * @return Calculate how long a single call took
-   */
-  private long singleCallDuration(final long expectedSleep) {
-    return (EnvironmentEdgeManager.currentTime() - this.globalStartTime) + expectedSleep;
-  }
+  T callWithRetries(RetryingCallable<T> callable, int callTimeout)
+  throws IOException, RuntimeException;
 
   /**
    * Call the server once only.
@@ -191,62 +50,6 @@ public class RpcRetryingCaller<T> {
    * @throws IOException if a remote or network exception occurs
    * @throws RuntimeException other unspecified error
    */
-  public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout)
-  throws IOException, RuntimeException {
-    // The code of this method should be shared with withRetries.
-    this.globalStartTime = EnvironmentEdgeManager.currentTime();
-    try {
-      callable.prepare(false);
-      return callable.call(callTimeout);
-    } catch (Throwable t) {
-      Throwable t2 = translateException(t);
-      ExceptionUtil.rethrowIfInterrupt(t2);
-      // It would be nice to clear the location cache here.
-      if (t2 instanceof IOException) {
-        throw (IOException)t2;
-      } else {
-        throw new RuntimeException(t2);
-      }
-    }
-  }
-  
-  /**
-   * Get the good or the remote exception if any, throws the DoNotRetryIOException.
-   * @param t the throwable to analyze
-   * @return the translated exception, if it's not a DoNotRetryIOException
-   * @throws DoNotRetryIOException - if we find it, we throw it instead of translating.
-   */
-  static Throwable translateException(Throwable t) throws DoNotRetryIOException {
-    if (t instanceof UndeclaredThrowableException) {
-      if (t.getCause() != null) {
-        t = t.getCause();
-      }
-    }
-    if (t instanceof RemoteException) {
-      t = ((RemoteException)t).unwrapRemoteException();
-    }
-    if (t instanceof LinkageError) {
-      throw new DoNotRetryIOException(t);
-    }
-    if (t instanceof ServiceException) {
-      ServiceException se = (ServiceException)t;
-      Throwable cause = se.getCause();
-      if (cause != null && cause instanceof DoNotRetryIOException) {
-        throw (DoNotRetryIOException)cause;
-      }
-      // Don't let ServiceException out; its rpc specific.
-      t = cause;
-      // t could be a RemoteException so go aaround again.
-      translateException(t);
-    } else if (t instanceof DoNotRetryIOException) {
-      throw (DoNotRetryIOException)t;
-    }
-    return t;
-  }
-
-  @Override
-  public String toString() {
-    return "RpcRetryingCaller{" + "globalStartTime=" + globalStartTime +
-        ", pause=" + pause + ", retries=" + retries + '}';
-  }
+  T callWithoutRetries(RetryingCallable<T> callable, int callTimeout)
+  throws IOException, RuntimeException;
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
index 9f05997..6f2760f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
@@ -35,6 +35,8 @@ public class RpcRetryingCallerFactory {
   private final int retries;
   private final RetryingCallerInterceptor interceptor;
   private final int startLogErrorsCnt;
+  private final boolean enableBackPressure;
+  private ServerStatisticTracker stats;
 
   public RpcRetryingCallerFactory(Configuration conf) {
     this(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR);
@@ -49,27 +51,53 @@ public class RpcRetryingCallerFactory {
     startLogErrorsCnt = conf.getInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY,
         AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
     this.interceptor = interceptor;
+    enableBackPressure = conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE,
+        HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE);
+  }
+
+  /**
+   * Set the tracker that should be used for tracking statistics about the server
+   */
+  public void setStatisticTracker(ServerStatisticTracker statisticTracker) {
+    this.stats = statisticTracker;
   }
 
   public <T> RpcRetryingCaller<T> newCaller() {
     // We store the values in the factory instance. This way, constructing new objects
     //  is cheap as it does not require parsing a complex structure.
-      return new RpcRetryingCaller<T>(pause, retries, interceptor, startLogErrorsCnt);
+    RpcRetryingCaller<T> caller = new RpcRetryingCallerImpl<T>(pause, retries, interceptor,
+        startLogErrorsCnt);
+
+    // wrap it with stats, if we are tracking them
+    if (enableBackPressure && this.stats != null) {
+      caller = new StatsTrackingRpcRetryingCaller<T>(caller, this.stats);
+    }
+
+    return caller;
   }
 
   public static RpcRetryingCallerFactory instantiate(Configuration configuration) {
-    return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR);
+    return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null);
   }
-  
+
   public static RpcRetryingCallerFactory instantiate(Configuration configuration,
-      RetryingCallerInterceptor interceptor) {
+      ServerStatisticTracker stats) {
+    return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, stats);
+  }
+
+  public static RpcRetryingCallerFactory instantiate(Configuration configuration,
+      RetryingCallerInterceptor interceptor, ServerStatisticTracker stats) {
     String clazzName = RpcRetryingCallerFactory.class.getName();
     String rpcCallerFactoryClazz =
         configuration.get(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, clazzName);
     if (rpcCallerFactoryClazz.equals(clazzName)) {
       return new RpcRetryingCallerFactory(configuration, interceptor);
     }
-    return ReflectionUtils.instantiateWithCustomCtor(rpcCallerFactoryClazz,
-      new Class[] { Configuration.class }, new Object[] { configuration });
+    RpcRetryingCallerFactory factory = ReflectionUtils.instantiateWithCustomCtor(
+        rpcCallerFactoryClazz, new Class[] { Configuration.class }, new Object[] { configuration });
+
+    // setting for backwards compat with existing caller factories, rather than in the ctor
+    factory.setStatisticTracker(stats);
+    return factory;
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java
new file mode 100644
index 0000000..1d037bc
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java
@@ -0,0 +1,238 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.net.SocketTimeoutException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.ExceptionUtil;
+import org.apache.hadoop.ipc.RemoteException;
+
+import com.google.protobuf.ServiceException;
+
+/**
+ * Runs an rpc'ing {@link RetryingCallable}. Sets into rpc client
+ * threadlocal outstanding timeouts as so we don't persist too much.
+ * Dynamic rather than static so can set the generic appropriately.
+ *
+ * This object has a state. It should not be used by in parallel by different threads.
+ * Reusing it is possible however, even between multiple threads. However, the user will
+ *  have to manage the synchronization on its side: there is no synchronization inside the class.
+ */
+@InterfaceAudience.Private
+public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> {
+  public static final Log LOG = LogFactory.getLog(RpcRetryingCallerImpl.class);
+  /**
+   * When we started making calls.
+   */
+  private long globalStartTime;
+  /**
+   * Start and end times for a single call.
+   */
+  private final static int MIN_RPC_TIMEOUT = 2000;
+  /** How many retries are allowed before we start to log */
+  private final int startLogErrorsCnt;
+
+  private final long pause;
+  private final int retries;
+  private final AtomicBoolean cancelled = new AtomicBoolean(false);
+  private final RetryingCallerInterceptor interceptor;
+  private final RetryingCallerInterceptorContext context;
+
+  public RpcRetryingCallerImpl(long pause, int retries, int startLogErrorsCnt) {
+    this(pause, retries, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, startLogErrorsCnt);
+  }
+  
+  public RpcRetryingCallerImpl(long pause, int retries,
+      RetryingCallerInterceptor interceptor, int startLogErrorsCnt) {
+    this.pause = pause;
+    this.retries = retries;
+    this.interceptor = interceptor;
+    context = interceptor.createEmptyContext();
+    this.startLogErrorsCnt = startLogErrorsCnt;
+  }
+
+  private int getRemainingTime(int callTimeout) {
+    if (callTimeout <= 0) {
+      return 0;
+    } else {
+      if (callTimeout == Integer.MAX_VALUE) return Integer.MAX_VALUE;
+      int remainingTime = (int) (callTimeout -
+          (EnvironmentEdgeManager.currentTime() - this.globalStartTime));
+      if (remainingTime < MIN_RPC_TIMEOUT) {
+        // If there is no time left, we're trying anyway. It's too late.
+        // 0 means no timeout, and it's not the intent here. So we secure both cases by
+        // resetting to the minimum.
+        remainingTime = MIN_RPC_TIMEOUT;
+      }
+      return remainingTime;
+    }
+  }
+  
+  @Override
+  public void cancel(){
+    cancelled.set(true);
+    synchronized (cancelled){
+      cancelled.notifyAll();
+    }
+  }
+
+  @Override
+  public T callWithRetries(RetryingCallable<T> callable, int callTimeout)
+  throws IOException, RuntimeException {
+    List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions =
+      new ArrayList<RetriesExhaustedException.ThrowableWithExtraContext>();
+    this.globalStartTime = EnvironmentEdgeManager.currentTime();
+    context.clear();
+    for (int tries = 0;; tries++) {
+      long expectedSleep;
+      try {
+        callable.prepare(tries != 0); // if called with false, check table status on ZK
+        interceptor.intercept(context.prepare(callable, tries));
+        return callable.call(getRemainingTime(callTimeout));
+      } catch (PreemptiveFastFailException e) {
+        throw e;
+      } catch (Throwable t) {
+        ExceptionUtil.rethrowIfInterrupt(t);
+        if (tries > startLogErrorsCnt) {
+          LOG.info("Call exception, tries=" + tries + ", retries=" + retries + ", started=" +
+              (EnvironmentEdgeManager.currentTime() - this.globalStartTime) + " ms ago, "
+              + "cancelled=" + cancelled.get() + ", msg="
+              + callable.getExceptionMessageAdditionalDetail());
+        }
+
+        // translateException throws exception when should not retry: i.e. when request is bad.
+        interceptor.handleFailure(context, t);
+        t = translateException(t);
+        callable.throwable(t, retries != 1);
+        RetriesExhaustedException.ThrowableWithExtraContext qt =
+            new RetriesExhaustedException.ThrowableWithExtraContext(t,
+                EnvironmentEdgeManager.currentTime(), toString());
+        exceptions.add(qt);
+        if (tries >= retries - 1) {
+          throw new RetriesExhaustedException(tries, exceptions);
+        }
+        // If the server is dead, we need to wait a little before retrying, to give
+        //  a chance to the regions to be
+        // tries hasn't been bumped up yet so we use "tries + 1" to get right pause time
+        expectedSleep = callable.sleep(pause, tries + 1);
+
+        // If, after the planned sleep, there won't be enough time left, we stop now.
+        long duration = singleCallDuration(expectedSleep);
+        if (duration > callTimeout) {
+          String msg = "callTimeout=" + callTimeout + ", callDuration=" + duration +
+              ": " + callable.getExceptionMessageAdditionalDetail();
+          throw (SocketTimeoutException)(new SocketTimeoutException(msg).initCause(t));
+        }
+      } finally {
+        interceptor.updateFailureInfo(context);
+      }
+      try {
+        if (expectedSleep > 0) {
+          synchronized (cancelled) {
+            if (cancelled.get()) return null;
+            cancelled.wait(expectedSleep);
+          }
+        }
+        if (cancelled.get()) return null;
+      } catch (InterruptedException e) {
+        throw new InterruptedIOException("Interrupted after " + tries + " tries  on " + retries);
+      }
+    }
+  }
+
+  /**
+   * @return Calculate how long a single call took
+   */
+  private long singleCallDuration(final long expectedSleep) {
+    return (EnvironmentEdgeManager.currentTime() - this.globalStartTime) + expectedSleep;
+  }
+
+  @Override
+  public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout)
+  throws IOException, RuntimeException {
+    // The code of this method should be shared with withRetries.
+    this.globalStartTime = EnvironmentEdgeManager.currentTime();
+    try {
+      callable.prepare(false);
+      return callable.call(callTimeout);
+    } catch (Throwable t) {
+      Throwable t2 = translateException(t);
+      ExceptionUtil.rethrowIfInterrupt(t2);
+      // It would be nice to clear the location cache here.
+      if (t2 instanceof IOException) {
+        throw (IOException)t2;
+      } else {
+        throw new RuntimeException(t2);
+      }
+    }
+  }
+  
+  /**
+   * Get the good or the remote exception if any, throws the DoNotRetryIOException.
+   * @param t the throwable to analyze
+   * @return the translated exception, if it's not a DoNotRetryIOException
+   * @throws DoNotRetryIOException - if we find it, we throw it instead of translating.
+   */
+  static Throwable translateException(Throwable t) throws DoNotRetryIOException {
+    if (t instanceof UndeclaredThrowableException) {
+      if (t.getCause() != null) {
+        t = t.getCause();
+      }
+    }
+    if (t instanceof RemoteException) {
+      t = ((RemoteException)t).unwrapRemoteException();
+    }
+    if (t instanceof LinkageError) {
+      throw new DoNotRetryIOException(t);
+    }
+    if (t instanceof ServiceException) {
+      ServiceException se = (ServiceException)t;
+      Throwable cause = se.getCause();
+      if (cause != null && cause instanceof DoNotRetryIOException) {
+        throw (DoNotRetryIOException)cause;
+      }
+      // Don't let ServiceException out; its rpc specific.
+      t = cause;
+      // t could be a RemoteException so go aaround again.
+      translateException(t);
+    } else if (t instanceof DoNotRetryIOException) {
+      throw (DoNotRetryIOException)t;
+    }
+    return t;
+  }
+
+  @Override
+  public String toString() {
+    return "RpcRetryingCaller{" + "globalStartTime=" + globalStartTime +
+        ", pause=" + pause + ", retries=" + retries + '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java
new file mode 100644
index 0000000..0c7b683
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Tracks the statistics for multiple regions
+ */
+@InterfaceAudience.Private
+public class ServerStatisticTracker {
+
+  private final Map<ServerName, ServerStatistics> stats =
+      new ConcurrentHashMap<ServerName, ServerStatistics>();
+
+  public void updateRegionStats(ServerName server, byte[] region, ClientProtos.RegionLoadStats
+      currentStats) {
+    ServerStatistics stat = stats.get(server);
+
+    if (stat == null) {
+      // create a stats object and update the stats
+      synchronized (this) {
+        stat = stats.get(server);
+        // we don't have stats for that server yet, so we need to make some
+        if (stat == null) {
+          stat = new ServerStatistics();
+          stats.put(server, stat);
+        }
+      }
+    }
+    stat.update(region, currentStats);
+  }
+
+  public ServerStatistics getStats(ServerName server) {
+    return this.stats.get(server);
+  }
+
+  public static ServerStatisticTracker create(Configuration conf) {
+    if (!conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE,
+        HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE)) {
+      return null;
+    }
+    return new ServerStatisticTracker();
+  }
+
+  @VisibleForTesting
+  ServerStatistics getServerStatsForTesting(ServerName server) {
+    return stats.get(server);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java
new file mode 100644
index 0000000..cec0ee5
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+import java.io.IOException;
+
+/**
+ * An {@link RpcRetryingCaller} that will update the per-region stats for the call on return,
+ * if stats are available
+ */
+@InterfaceAudience.Private
+public class StatsTrackingRpcRetryingCaller<T> implements RpcRetryingCaller<T> {
+  private final ServerStatisticTracker stats;
+  private final RpcRetryingCaller<T> delegate;
+
+  public StatsTrackingRpcRetryingCaller(RpcRetryingCaller<T> delegate,
+      ServerStatisticTracker stats) {
+    this.delegate = delegate;
+    this.stats = stats;
+  }
+
+  @Override
+  public void cancel() {
+    delegate.cancel();
+  }
+
+  @Override
+  public T callWithRetries(RetryingCallable<T> callable, int callTimeout)
+      throws IOException, RuntimeException {
+    T result = delegate.callWithRetries(callable, callTimeout);
+    return updateStatsAndUnwrap(result, callable);
+  }
+
+  @Override
+  public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout)
+      throws IOException, RuntimeException {
+    T result = delegate.callWithRetries(callable, callTimeout);
+    return updateStatsAndUnwrap(result, callable);
+  }
+
+  private T updateStatsAndUnwrap(T result, RetryingCallable<T> callable) {
+    // don't track stats about requests that aren't to regionservers
+    if (!(callable instanceof RegionServerCallable)) {
+      return result;
+    }
+
+    // mutli-server callables span multiple regions, so they don't have a location,
+    // but they are region server callables, so we have to handle them when we process the
+    // result, not in here
+    if (callable instanceof MultiServerCallable) {
+      return result;
+    }
+
+    // update the stats for the single server callable
+    RegionServerCallable<T> regionCallable = (RegionServerCallable) callable;
+    HRegionLocation location = regionCallable.getLocation();
+    return ResultStatsUtil.updateStats(result, stats, location);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicy.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicy.java
new file mode 100644
index 0000000..94e434f
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicy.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.backoff;
+
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Configurable policy for the amount of time a client should wait for a new request to the
+ * server when given the server load statistics.
+ * <p>
+ * Must have a single-argument constructor that takes a {@link org.apache.hadoop.conf.Configuration}
+ * </p>
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public interface ClientBackoffPolicy {
+
+  public static final String BACKOFF_POLICY_CLASS =
+      "hbase.client.statistics.backoff-policy";
+
+  /**
+   * @return the number of ms to wait on the client based on the
+   */
+  public long getBackoffTime(ServerName serverName, byte[] region, ServerStatistics stats);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicyFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicyFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicyFactory.java
new file mode 100644
index 0000000..879a0e2
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicyFactory.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.backoff;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public final class ClientBackoffPolicyFactory {
+
+  private static final Log LOG = LogFactory.getLog(ClientBackoffPolicyFactory.class);
+
+  private ClientBackoffPolicyFactory() {
+  }
+
+  public static ClientBackoffPolicy create(Configuration conf) {
+    // create the backoff policy
+    String className =
+        conf.get(ClientBackoffPolicy.BACKOFF_POLICY_CLASS, NoBackoffPolicy.class
+            .getName());
+      return ReflectionUtils.instantiateWithCustomCtor(className,
+          new Class<?>[] { Configuration.class }, new Object[] { conf });
+  }
+
+  /**
+   * Default backoff policy that doesn't create any backoff for the client, regardless of load
+   */
+  public static class NoBackoffPolicy implements ClientBackoffPolicy {
+    public NoBackoffPolicy(Configuration conf){
+      // necessary to meet contract
+    }
+
+    @Override
+    public long getBackoffTime(ServerName serverName, byte[] region, ServerStatistics stats) {
+      return 0;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ExponentialClientBackoffPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ExponentialClientBackoffPolicy.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ExponentialClientBackoffPolicy.java
new file mode 100644
index 0000000..6e75670
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ExponentialClientBackoffPolicy.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.backoff;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Simple exponential backoff policy on for the client that uses a  percent^4 times the
+ * max backoff to generate the backoff time.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class ExponentialClientBackoffPolicy implements ClientBackoffPolicy {
+
+  private static final Log LOG = LogFactory.getLog(ExponentialClientBackoffPolicy.class);
+
+  private static final long ONE_MINUTE = 60 * 1000;
+  public static final long DEFAULT_MAX_BACKOFF = 5 * ONE_MINUTE;
+  public static final String MAX_BACKOFF_KEY = "hbase.client.exponential-backoff.max";
+  private long maxBackoff;
+
+  public ExponentialClientBackoffPolicy(Configuration conf) {
+    this.maxBackoff = conf.getLong(MAX_BACKOFF_KEY, DEFAULT_MAX_BACKOFF);
+  }
+
+  @Override
+  public long getBackoffTime(ServerName serverName, byte[] region, ServerStatistics stats) {
+    // no stats for the server yet, so don't backoff
+    if (stats == null) {
+      return 0;
+    }
+
+    ServerStatistics.RegionStatistics regionStats = stats.getStatsForRegion(region);
+    // no stats for the region yet - don't backoff
+    if (regionStats == null) {
+      return 0;
+    }
+
+    // square the percent as a value less than 1. Closer we move to 100 percent,
+    // the percent moves to 1, but squaring causes the exponential curve
+    double percent = regionStats.getMemstoreLoadPercent() / 100.0;
+    double multiplier = Math.pow(percent, 4.0);
+    // shouldn't ever happen, but just incase something changes in the statistic data
+    if (multiplier > 1) {
+      LOG.warn("Somehow got a backoff multiplier greater than the allowed backoff. Forcing back " +
+          "down to the max backoff");
+      multiplier = 1;
+    }
+    return (long) (multiplier * maxBackoff);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ServerStatistics.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ServerStatistics.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ServerStatistics.java
new file mode 100644
index 0000000..a3b8e11
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ServerStatistics.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.backoff;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * Track the statistics for a single region
+ */
+@InterfaceAudience.Private
+public class ServerStatistics {
+
+  private Map<byte[], RegionStatistics>
+      stats = new TreeMap<byte[], RegionStatistics>(Bytes.BYTES_COMPARATOR);
+
+  /**
+   * Good enough attempt. Last writer wins. It doesn't really matter which one gets to update,
+   * as something gets set
+   * @param region
+   * @param currentStats
+   */
+  public void update(byte[] region, ClientProtos.RegionLoadStats currentStats) {
+    RegionStatistics regionStat = this.stats.get(region);
+    if(regionStat == null){
+      regionStat = new RegionStatistics();
+      this.stats.put(region, regionStat);
+    }
+
+    regionStat.update(currentStats);
+  }
+
+  @InterfaceAudience.Private
+  public RegionStatistics getStatsForRegion(byte[] regionName){
+    return stats.get(regionName);
+  }
+
+  public static class RegionStatistics{
+    private int memstoreLoad = 0;
+
+    public void update(ClientProtos.RegionLoadStats currentStats) {
+      this.memstoreLoad = currentStats.getMemstoreLoad();
+    }
+
+    public int getMemstoreLoadPercent(){
+      return this.memstoreLoad;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
index 8c75f4f..8433cee 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
@@ -61,7 +61,7 @@ public class RegionCoprocessorRpcChannel extends CoprocessorRpcChannel{
     this.connection = conn;
     this.table = table;
     this.row = row;
-    this.rpcFactory = RpcRetryingCallerFactory.instantiate(conn.getConfiguration());
+    this.rpcFactory = RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null);
     this.operationTimeout = conn.getConfiguration().getInt(
         HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
         HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
index 70da40c..1d42a82 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
@@ -114,17 +114,23 @@ public final class ResponseConverter {
       }
 
       for (ResultOrException roe : actionResult.getResultOrExceptionList()) {
+        Object responseValue;
         if (roe.hasException()) {
-          results.add(regionName, roe.getIndex(), ProtobufUtil.toException(roe.getException()));
+          responseValue = ProtobufUtil.toException(roe.getException());
         } else if (roe.hasResult()) {
-          results.add(regionName, roe.getIndex(), ProtobufUtil.toResult(roe.getResult(), cells));
+          responseValue = ProtobufUtil.toResult(roe.getResult(), cells);
+          // add the load stats, if we got any
+          if (roe.hasLoadStats()) {
+            ((Result) responseValue).addResults(roe.getLoadStats());
+          }
         } else if (roe.hasServiceResult()) {
-          results.add(regionName, roe.getIndex(), roe.getServiceResult());
+          responseValue = roe.getServiceResult();
         } else {
           // no result & no exception. Unexpected.
           throw new IllegalStateException("No result & no exception roe=" + roe +
               " for region " + actions.getRegion());
         }
+        results.add(regionName, roe.getIndex(), responseValue);
       }
     }
 
@@ -149,9 +155,11 @@ public final class ResponseConverter {
    * @param r
    * @return an action result builder
    */
-  public static ResultOrException.Builder buildActionResult(final ClientProtos.Result r) {
+  public static ResultOrException.Builder buildActionResult(final ClientProtos.Result r,
+      ClientProtos.RegionLoadStats stats) {
     ResultOrException.Builder builder = ResultOrException.newBuilder();
     if (r != null) builder.setResult(r);
+    if(stats != null) builder.setLoadStats(stats);
     return builder;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index d219638..88a95fb 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -190,7 +190,7 @@ public class TestAsyncProcess {
             }
           });
 
-      return new RpcRetryingCaller<MultiResponse>(100, 10, 9) {
+      return new RpcRetryingCallerImpl<MultiResponse>(100, 10, 9) {
         @Override
         public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable,
                                                 int callTimeout)
@@ -208,7 +208,7 @@ public class TestAsyncProcess {
     }
   }
 
-  static class CallerWithFailure extends RpcRetryingCaller<MultiResponse>{
+  static class CallerWithFailure extends RpcRetryingCallerImpl<MultiResponse>{
 
     public CallerWithFailure() {
       super(100, 100, 9);
@@ -294,7 +294,7 @@ public class TestAsyncProcess {
         replicaCalls.incrementAndGet();
       }
 
-      return new RpcRetryingCaller<MultiResponse>(100, 10, 9) {
+      return new RpcRetryingCallerImpl<MultiResponse>(100, 10, 9) {
         @Override
         public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable, int callTimeout)
         throws IOException, RuntimeException {

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientExponentialBackoff.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientExponentialBackoff.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientExponentialBackoff.java
new file mode 100644
index 0000000..88e409d
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientExponentialBackoff.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.backoff.ExponentialClientBackoffPolicy;
+import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestClientExponentialBackoff {
+
+  ServerName server = Mockito.mock(ServerName.class);
+  byte[] regionname = Bytes.toBytes("region");
+
+  @Test
+  public void testNulls() {
+    Configuration conf = new Configuration(false);
+    ExponentialClientBackoffPolicy backoff = new ExponentialClientBackoffPolicy(conf);
+    assertEquals(0, backoff.getBackoffTime(null, null, null));
+
+    // server name doesn't matter to calculation, but check it now anyways
+    assertEquals(0, backoff.getBackoffTime(server, null, null));
+    assertEquals(0, backoff.getBackoffTime(server, regionname, null));
+
+    // check when no stats for the region yet
+    ServerStatistics stats = new ServerStatistics();
+    assertEquals(0, backoff.getBackoffTime(server, regionname, stats));
+  }
+
+  @Test
+  public void testMaxLoad() {
+    Configuration conf = new Configuration(false);
+    ExponentialClientBackoffPolicy backoff = new ExponentialClientBackoffPolicy(conf);
+
+    ServerStatistics stats = new ServerStatistics();
+    update(stats, 100);
+    assertEquals(ExponentialClientBackoffPolicy.DEFAULT_MAX_BACKOFF, backoff.getBackoffTime(server,
+        regionname, stats));
+
+    // another policy with a different max timeout
+    long max = 100;
+    conf.setLong(ExponentialClientBackoffPolicy.MAX_BACKOFF_KEY, max);
+    ExponentialClientBackoffPolicy backoffShortTimeout = new ExponentialClientBackoffPolicy(conf);
+    assertEquals(max, backoffShortTimeout.getBackoffTime(server, regionname, stats));
+
+    // test beyond 100 still doesn't exceed the max
+    update(stats, 101);
+    assertEquals(ExponentialClientBackoffPolicy.DEFAULT_MAX_BACKOFF, backoff.getBackoffTime(server,
+        regionname, stats));
+    assertEquals(max, backoffShortTimeout.getBackoffTime(server, regionname, stats));
+
+    // and that when we are below 100, its less than the max timeout
+    update(stats, 99);
+    assertTrue(backoff.getBackoffTime(server,
+        regionname, stats) < ExponentialClientBackoffPolicy.DEFAULT_MAX_BACKOFF);
+    assertTrue(backoffShortTimeout.getBackoffTime(server, regionname, stats) < max);
+  }
+
+  /**
+   * Make sure that we get results in the order that we expect - backoff for a load of 1 should
+   * less than backoff for 10, which should be less than that for 50.
+   */
+  @Test
+  public void testResultOrdering() {
+    Configuration conf = new Configuration(false);
+    // make the max timeout really high so we get differentiation between load factors
+    conf.setLong(ExponentialClientBackoffPolicy.MAX_BACKOFF_KEY, Integer.MAX_VALUE);
+    ExponentialClientBackoffPolicy backoff = new ExponentialClientBackoffPolicy(conf);
+
+    ServerStatistics stats = new ServerStatistics();
+    long previous = backoff.getBackoffTime(server, regionname, stats);
+    for (int i = 1; i <= 100; i++) {
+      update(stats, i);
+      long next = backoff.getBackoffTime(server, regionname, stats);
+      assertTrue(
+          "Previous backoff time" + previous + " >= " + next + ", the next backoff time for " +
+              "load " + i, previous < next);
+      previous = next;
+    }
+  }
+
+  private void update(ServerStatistics stats, int load) {
+    ClientProtos.RegionLoadStats stat = ClientProtos.RegionLoadStats.newBuilder()
+        .setMemstoreLoad
+            (load).build();
+    stats.update(regionname, stat);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestFastFailWithoutTestUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestFastFailWithoutTestUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestFastFailWithoutTestUtil.java
index 080cd8b..e82e59d 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestFastFailWithoutTestUtil.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestFastFailWithoutTestUtil.java
@@ -564,7 +564,7 @@ public class TestFastFailWithoutTestUtil {
 
   public RpcRetryingCaller<Void> getRpcRetryingCaller(int pauseTime,
       int retries, RetryingCallerInterceptor interceptor) {
-    return new RpcRetryingCaller<Void>(pauseTime, retries, interceptor, 9) {
+    return new RpcRetryingCallerImpl<Void>(pauseTime, retries, interceptor, 9) {
       @Override
       public Void callWithRetries(RetryingCallable<Void> callable,
           int callTimeout) throws IOException, RuntimeException {

http://git-wip-us.apache.org/repos/asf/hbase/blob/a411227b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 6001767..33b71ad 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1099,6 +1099,12 @@ public final class HConstants {
   public static final String HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL =
       "hbase.client.fast.fail.interceptor.impl"; 
 
+  /** Config key for if the server should send backpressure and if the client should listen to
+   * that backpressure from the server */
+  public static final String ENABLE_CLIENT_BACKPRESSURE = "hbase.client.backpressure.enabled";
+  public static final boolean DEFAULT_ENABLE_CLIENT_BACKPRESSURE = false;
+
+
   private HConstants() {
     // Can't be instantiated with this ctor.
   }