You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2016/12/13 12:20:32 UTC

hbase git commit: HBASE-17296 Provide per peer throttling for replication (Guanghao Zhang)

Repository: hbase
Updated Branches:
  refs/heads/master adb319f5c -> 233359627


HBASE-17296 Provide per peer throttling for replication (Guanghao Zhang)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/23335962
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/23335962
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/23335962

Branch: refs/heads/master
Commit: 2333596279b63c045e5fd5be09b2fce8ce5c9980
Parents: adb319f
Author: tedyu <yu...@gmail.com>
Authored: Tue Dec 13 04:20:20 2016 -0800
Committer: tedyu <yu...@gmail.com>
Committed: Tue Dec 13 04:20:20 2016 -0800

----------------------------------------------------------------------
 .../replication/ReplicationSerDeHelper.java     |   4 +
 .../hbase/replication/ReplicationPeer.java      |   6 +
 .../replication/ReplicationPeerConfig.java      |  13 ++-
 .../replication/ReplicationPeerZKImpl.java      |   5 +
 .../replication/ReplicationPeersZKImpl.java     |   1 +
 .../shaded/protobuf/generated/ClientProtos.java |  30 ++---
 .../protobuf/generated/ZooKeeperProtos.java     | 115 +++++++++++++++---
 .../src/main/protobuf/ZooKeeper.proto           |   1 +
 .../hbase/protobuf/generated/ClientProtos.java  |  30 ++---
 .../protobuf/generated/ZooKeeperProtos.java     | 117 ++++++++++++++++---
 .../src/main/protobuf/ZooKeeper.proto           |   1 +
 .../regionserver/ReplicationSource.java         |  32 ++++-
 .../regionserver/ReplicationThrottler.java      |   9 +-
 .../replication/TestReplicationAdmin.java       |  17 +++
 .../src/main/ruby/hbase/replication_admin.rb    |   9 ++
 hbase-shell/src/main/ruby/shell.rb              |   1 +
 .../src/main/ruby/shell/commands/list_peers.rb  |   5 +-
 .../ruby/shell/commands/set_peer_bandwidth.rb   |  42 +++++++
 .../test/ruby/hbase/replication_admin_test.rb   |  18 +++
 19 files changed, 393 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/23335962/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java
index 6ac4417..dd83fb1 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java
@@ -287,6 +287,9 @@ public final class ReplicationSerDeHelper {
       }
       peerConfig.setNamespaces(namespaces);
     }
+    if (peer.hasBandwidth()) {
+      peerConfig.setBandwidth(peer.getBandwidth());
+    }
     return peerConfig;
   }
 
@@ -326,6 +329,7 @@ public final class ReplicationSerDeHelper {
       }
     }
 
+    builder.setBandwidth(peerConfig.getBandwidth());
     return builder.build();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/23335962/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
index bd2b700..4f18048 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
@@ -78,6 +78,12 @@ public interface ReplicationPeer {
    */
   public Set<String> getNamespaces();
 
+  /**
+   * Get the per node bandwidth upper limit for this peer
+   * @return the bandwidth upper limit
+   */
+  public long getPeerBandwidth();
+
   void trackPeerConfigChanges(ReplicationPeerConfigListener listener);
 
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/23335962/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
index 64f6d1b..790f021 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
@@ -43,6 +43,7 @@ public class ReplicationPeerConfig {
   private final Map<String, String> configuration;
   private Map<TableName, ? extends Collection<String>> tableCFsMap = null;
   private Set<String> namespaces = null;
+  private long bandwidth = 0;
 
   public ReplicationPeerConfig() {
     this.peerData = new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);
@@ -102,6 +103,15 @@ public class ReplicationPeerConfig {
     return this;
   }
 
+  public long getBandwidth() {
+    return this.bandwidth;
+  }
+
+  public ReplicationPeerConfig setBandwidth(long bandwidth) {
+    this.bandwidth = bandwidth;
+    return this;
+  }
+
   @Override
   public String toString() {
     StringBuilder builder = new StringBuilder("clusterKey=").append(clusterKey).append(",");
@@ -110,8 +120,9 @@ public class ReplicationPeerConfig {
       builder.append("namespaces=").append(namespaces.toString()).append(",");
     }
     if (tableCFsMap != null) {
-      builder.append("tableCFs=").append(tableCFsMap.toString());
+      builder.append("tableCFs=").append(tableCFsMap.toString()).append(",");
     }
+    builder.append("bandwidth=").append(bandwidth);
     return builder.toString();
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/23335962/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
index 5302b1b..c58bd71 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
@@ -174,6 +174,11 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase
   }
 
   @Override
+  public long getPeerBandwidth() {
+    return this.peerConfig.getBandwidth();
+  }
+
+  @Override
   public void trackPeerConfigChanges(ReplicationPeerConfigListener listener) {
     if (this.peerConfigTracker != null){
       this.peerConfigTracker.setListener(listener);

http://git-wip-us.apache.org/repos/asf/hbase/blob/23335962/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
index d12c4e9..9a617a7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
@@ -367,6 +367,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
     existingConfig.getPeerData().putAll(newConfig.getPeerData());
     existingConfig.setTableCFsMap(newConfig.getTableCFsMap());
     existingConfig.setNamespaces(newConfig.getNamespaces());
+    existingConfig.setBandwidth(newConfig.getBandwidth());
 
     try {
       ZKUtil.setData(this.zookeeper, getPeerNode(id),

http://git-wip-us.apache.org/repos/asf/hbase/blob/23335962/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ClientProtos.java
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ClientProtos.java b/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ClientProtos.java
index e9458df..eab62eb 100644
--- a/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ClientProtos.java
+++ b/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ClientProtos.java
@@ -19372,7 +19372,7 @@ public final class ClientProtos {
     /**
      * <pre>
      * This field is filled in if the client has requested that scan metrics be tracked.
-     * The metrics tracked here are sent back to the client to be tracked together with 
+     * The metrics tracked here are sent back to the client to be tracked together with
      * the existing client side metrics.
      * </pre>
      *
@@ -19382,7 +19382,7 @@ public final class ClientProtos {
     /**
      * <pre>
      * This field is filled in if the client has requested that scan metrics be tracked.
-     * The metrics tracked here are sent back to the client to be tracked together with 
+     * The metrics tracked here are sent back to the client to be tracked together with
      * the existing client side metrics.
      * </pre>
      *
@@ -19392,7 +19392,7 @@ public final class ClientProtos {
     /**
      * <pre>
      * This field is filled in if the client has requested that scan metrics be tracked.
-     * The metrics tracked here are sent back to the client to be tracked together with 
+     * The metrics tracked here are sent back to the client to be tracked together with
      * the existing client side metrics.
      * </pre>
      *
@@ -19902,7 +19902,7 @@ public final class ClientProtos {
     /**
      * <pre>
      * This field is filled in if the client has requested that scan metrics be tracked.
-     * The metrics tracked here are sent back to the client to be tracked together with 
+     * The metrics tracked here are sent back to the client to be tracked together with
      * the existing client side metrics.
      * </pre>
      *
@@ -19914,7 +19914,7 @@ public final class ClientProtos {
     /**
      * <pre>
      * This field is filled in if the client has requested that scan metrics be tracked.
-     * The metrics tracked here are sent back to the client to be tracked together with 
+     * The metrics tracked here are sent back to the client to be tracked together with
      * the existing client side metrics.
      * </pre>
      *
@@ -19926,7 +19926,7 @@ public final class ClientProtos {
     /**
      * <pre>
      * This field is filled in if the client has requested that scan metrics be tracked.
-     * The metrics tracked here are sent back to the client to be tracked together with 
+     * The metrics tracked here are sent back to the client to be tracked together with
      * the existing client side metrics.
      * </pre>
      *
@@ -21436,7 +21436,7 @@ public final class ClientProtos {
       /**
        * <pre>
        * This field is filled in if the client has requested that scan metrics be tracked.
-       * The metrics tracked here are sent back to the client to be tracked together with 
+       * The metrics tracked here are sent back to the client to be tracked together with
        * the existing client side metrics.
        * </pre>
        *
@@ -21448,7 +21448,7 @@ public final class ClientProtos {
       /**
        * <pre>
        * This field is filled in if the client has requested that scan metrics be tracked.
-       * The metrics tracked here are sent back to the client to be tracked together with 
+       * The metrics tracked here are sent back to the client to be tracked together with
        * the existing client side metrics.
        * </pre>
        *
@@ -21464,7 +21464,7 @@ public final class ClientProtos {
       /**
        * <pre>
        * This field is filled in if the client has requested that scan metrics be tracked.
-       * The metrics tracked here are sent back to the client to be tracked together with 
+       * The metrics tracked here are sent back to the client to be tracked together with
        * the existing client side metrics.
        * </pre>
        *
@@ -21486,7 +21486,7 @@ public final class ClientProtos {
       /**
        * <pre>
        * This field is filled in if the client has requested that scan metrics be tracked.
-       * The metrics tracked here are sent back to the client to be tracked together with 
+       * The metrics tracked here are sent back to the client to be tracked together with
        * the existing client side metrics.
        * </pre>
        *
@@ -21506,7 +21506,7 @@ public final class ClientProtos {
       /**
        * <pre>
        * This field is filled in if the client has requested that scan metrics be tracked.
-       * The metrics tracked here are sent back to the client to be tracked together with 
+       * The metrics tracked here are sent back to the client to be tracked together with
        * the existing client side metrics.
        * </pre>
        *
@@ -21532,7 +21532,7 @@ public final class ClientProtos {
       /**
        * <pre>
        * This field is filled in if the client has requested that scan metrics be tracked.
-       * The metrics tracked here are sent back to the client to be tracked together with 
+       * The metrics tracked here are sent back to the client to be tracked together with
        * the existing client side metrics.
        * </pre>
        *
@@ -21551,7 +21551,7 @@ public final class ClientProtos {
       /**
        * <pre>
        * This field is filled in if the client has requested that scan metrics be tracked.
-       * The metrics tracked here are sent back to the client to be tracked together with 
+       * The metrics tracked here are sent back to the client to be tracked together with
        * the existing client side metrics.
        * </pre>
        *
@@ -21565,7 +21565,7 @@ public final class ClientProtos {
       /**
        * <pre>
        * This field is filled in if the client has requested that scan metrics be tracked.
-       * The metrics tracked here are sent back to the client to be tracked together with 
+       * The metrics tracked here are sent back to the client to be tracked together with
        * the existing client side metrics.
        * </pre>
        *
@@ -21582,7 +21582,7 @@ public final class ClientProtos {
       /**
        * <pre>
        * This field is filled in if the client has requested that scan metrics be tracked.
-       * The metrics tracked here are sent back to the client to be tracked together with 
+       * The metrics tracked here are sent back to the client to be tracked together with
        * the existing client side metrics.
        * </pre>
        *

http://git-wip-us.apache.org/repos/asf/hbase/blob/23335962/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ZooKeeperProtos.java
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ZooKeeperProtos.java b/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ZooKeeperProtos.java
index 6baf845..90ec659 100644
--- a/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ZooKeeperProtos.java
+++ b/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ZooKeeperProtos.java
@@ -4994,6 +4994,15 @@ public final class ZooKeeperProtos {
      * <code>repeated bytes namespaces = 6;</code>
      */
     org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString getNamespaces(int index);
+
+    /**
+     * <code>optional int64 bandwidth = 7;</code>
+     */
+    boolean hasBandwidth();
+    /**
+     * <code>optional int64 bandwidth = 7;</code>
+     */
+    long getBandwidth();
   }
   /**
    * <pre>
@@ -5018,6 +5027,7 @@ public final class ZooKeeperProtos {
       configuration_ = java.util.Collections.emptyList();
       tableCfs_ = java.util.Collections.emptyList();
       namespaces_ = java.util.Collections.emptyList();
+      bandwidth_ = 0L;
     }
 
     @java.lang.Override
@@ -5095,6 +5105,11 @@ public final class ZooKeeperProtos {
               namespaces_.add(input.readBytes());
               break;
             }
+            case 56: {
+              bitField0_ |= 0x00000004;
+              bandwidth_ = input.readInt64();
+              break;
+            }
           }
         }
       } catch (org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException e) {
@@ -5358,6 +5373,21 @@ public final class ZooKeeperProtos {
       return namespaces_.get(index);
     }
 
+    public static final int BANDWIDTH_FIELD_NUMBER = 7;
+    private long bandwidth_;
+    /**
+     * <code>optional int64 bandwidth = 7;</code>
+     */
+    public boolean hasBandwidth() {
+      return ((bitField0_ & 0x00000004) == 0x00000004);
+    }
+    /**
+     * <code>optional int64 bandwidth = 7;</code>
+     */
+    public long getBandwidth() {
+      return bandwidth_;
+    }
+
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
       byte isInitialized = memoizedIsInitialized;
@@ -5410,6 +5440,9 @@ public final class ZooKeeperProtos {
       for (int i = 0; i < namespaces_.size(); i++) {
         output.writeBytes(6, namespaces_.get(i));
       }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        output.writeInt64(7, bandwidth_);
+      }
       unknownFields.writeTo(output);
     }
 
@@ -5445,6 +5478,10 @@ public final class ZooKeeperProtos {
         size += dataSize;
         size += 1 * getNamespacesList().size();
       }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        size += org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream
+          .computeInt64Size(7, bandwidth_);
+      }
       size += unknownFields.getSerializedSize();
       memoizedSize = size;
       return size;
@@ -5480,6 +5517,11 @@ public final class ZooKeeperProtos {
           .equals(other.getTableCfsList());
       result = result && getNamespacesList()
           .equals(other.getNamespacesList());
+      result = result && (hasBandwidth() == other.hasBandwidth());
+      if (hasBandwidth()) {
+        result = result && (getBandwidth()
+            == other.getBandwidth());
+      }
       result = result && unknownFields.equals(other.unknownFields);
       return result;
     }
@@ -5515,6 +5557,11 @@ public final class ZooKeeperProtos {
         hash = (37 * hash) + NAMESPACES_FIELD_NUMBER;
         hash = (53 * hash) + getNamespacesList().hashCode();
       }
+      if (hasBandwidth()) {
+        hash = (37 * hash) + BANDWIDTH_FIELD_NUMBER;
+        hash = (53 * hash) + org.apache.hadoop.hbase.shaded.com.google.protobuf.Internal.hashLong(
+            getBandwidth());
+      }
       hash = (29 * hash) + unknownFields.hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -5665,6 +5712,8 @@ public final class ZooKeeperProtos {
         }
         namespaces_ = java.util.Collections.emptyList();
         bitField0_ = (bitField0_ & ~0x00000020);
+        bandwidth_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000040);
         return this;
       }
 
@@ -5729,6 +5778,10 @@ public final class ZooKeeperProtos {
           bitField0_ = (bitField0_ & ~0x00000020);
         }
         result.namespaces_ = namespaces_;
+        if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
+          to_bitField0_ |= 0x00000004;
+        }
+        result.bandwidth_ = bandwidth_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -5869,6 +5922,9 @@ public final class ZooKeeperProtos {
           }
           onChanged();
         }
+        if (other.hasBandwidth()) {
+          setBandwidth(other.getBandwidth());
+        }
         this.mergeUnknownFields(other.unknownFields);
         onChanged();
         return this;
@@ -6888,6 +6944,38 @@ public final class ZooKeeperProtos {
         onChanged();
         return this;
       }
+
+      private long bandwidth_ ;
+      /**
+       * <code>optional int64 bandwidth = 7;</code>
+       */
+      public boolean hasBandwidth() {
+        return ((bitField0_ & 0x00000040) == 0x00000040);
+      }
+      /**
+       * <code>optional int64 bandwidth = 7;</code>
+       */
+      public long getBandwidth() {
+        return bandwidth_;
+      }
+      /**
+       * <code>optional int64 bandwidth = 7;</code>
+       */
+      public Builder setBandwidth(long value) {
+        bitField0_ |= 0x00000040;
+        bandwidth_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int64 bandwidth = 7;</code>
+       */
+      public Builder clearBandwidth() {
+        bitField0_ = (bitField0_ & ~0x00000040);
+        bandwidth_ = 0L;
+        onChanged();
+        return this;
+      }
       public final Builder setUnknownFields(
           final org.apache.hadoop.hbase.shaded.com.google.protobuf.UnknownFieldSet unknownFields) {
         return super.setUnknownFields(unknownFields);
@@ -9803,23 +9891,24 @@ public final class ZooKeeperProtos {
       "e:\007ENABLED\"?\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010DISA" +
       "BLED\020\001\022\r\n\tDISABLING\020\002\022\014\n\010ENABLING\020\003\"D\n\007T" +
       "ableCF\022\'\n\ntable_name\030\001 \001(\0132\023.hbase.pb.Ta",
-      "bleName\022\020\n\010families\030\002 \003(\014\"\331\001\n\017Replicatio" +
+      "bleName\022\020\n\010families\030\002 \003(\014\"\354\001\n\017Replicatio" +
       "nPeer\022\022\n\nclusterkey\030\001 \002(\t\022\037\n\027replication" +
       "EndpointImpl\030\002 \001(\t\022&\n\004data\030\003 \003(\0132\030.hbase" +
       ".pb.BytesBytesPair\022/\n\rconfiguration\030\004 \003(" +
       "\0132\030.hbase.pb.NameStringPair\022$\n\ttable_cfs" +
       "\030\005 \003(\0132\021.hbase.pb.TableCF\022\022\n\nnamespaces\030" +
-      "\006 \003(\014\"g\n\020ReplicationState\022/\n\005state\030\001 \002(\016" +
-      "2 .hbase.pb.ReplicationState.State\"\"\n\005St" +
-      "ate\022\013\n\007ENABLED\020\000\022\014\n\010DISABLED\020\001\"+\n\027Replic" +
-      "ationHLogPosition\022\020\n\010position\030\001 \002(\003\"\252\001\n\t",
-      "TableLock\022\'\n\ntable_name\030\001 \001(\0132\023.hbase.pb" +
-      ".TableName\022(\n\nlock_owner\030\002 \001(\0132\024.hbase.p" +
-      "b.ServerName\022\021\n\tthread_id\030\003 \001(\003\022\021\n\tis_sh" +
-      "ared\030\004 \001(\010\022\017\n\007purpose\030\005 \001(\t\022\023\n\013create_ti" +
-      "me\030\006 \001(\003\"\036\n\013SwitchState\022\017\n\007enabled\030\001 \001(\010" +
-      "BL\n1org.apache.hadoop.hbase.shaded.proto" +
-      "buf.generatedB\017ZooKeeperProtosH\001\210\001\001\240\001\001"
+      "\006 \003(\014\022\021\n\tbandwidth\030\007 \001(\003\"g\n\020ReplicationS" +
+      "tate\022/\n\005state\030\001 \002(\0162 .hbase.pb.Replicati" +
+      "onState.State\"\"\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010D" +
+      "ISABLED\020\001\"+\n\027ReplicationHLogPosition\022\020\n\010",
+      "position\030\001 \002(\003\"\252\001\n\tTableLock\022\'\n\ntable_na" +
+      "me\030\001 \001(\0132\023.hbase.pb.TableName\022(\n\nlock_ow" +
+      "ner\030\002 \001(\0132\024.hbase.pb.ServerName\022\021\n\tthrea" +
+      "d_id\030\003 \001(\003\022\021\n\tis_shared\030\004 \001(\010\022\017\n\007purpose" +
+      "\030\005 \001(\t\022\023\n\013create_time\030\006 \001(\003\"\036\n\013SwitchSta" +
+      "te\022\017\n\007enabled\030\001 \001(\010BL\n1org.apache.hadoop" +
+      ".hbase.shaded.protobuf.generatedB\017ZooKee" +
+      "perProtosH\001\210\001\001\240\001\001"
     };
     org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
         new org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.FileDescriptor.    InternalDescriptorAssigner() {
@@ -9876,7 +9965,7 @@ public final class ZooKeeperProtos {
     internal_static_hbase_pb_ReplicationPeer_fieldAccessorTable = new
       org.apache.hadoop.hbase.shaded.com.google.protobuf.GeneratedMessageV3.FieldAccessorTable(
         internal_static_hbase_pb_ReplicationPeer_descriptor,
-        new java.lang.String[] { "Clusterkey", "ReplicationEndpointImpl", "Data", "Configuration", "TableCfs", "Namespaces", });
+        new java.lang.String[] { "Clusterkey", "ReplicationEndpointImpl", "Data", "Configuration", "TableCfs", "Namespaces", "Bandwidth", });
     internal_static_hbase_pb_ReplicationState_descriptor =
       getDescriptor().getMessageTypes().get(7);
     internal_static_hbase_pb_ReplicationState_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/hbase/blob/23335962/hbase-protocol-shaded/src/main/protobuf/ZooKeeper.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/protobuf/ZooKeeper.proto b/hbase-protocol-shaded/src/main/protobuf/ZooKeeper.proto
index c66639b..323862c 100644
--- a/hbase-protocol-shaded/src/main/protobuf/ZooKeeper.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/ZooKeeper.proto
@@ -122,6 +122,7 @@ message ReplicationPeer {
   repeated NameStringPair configuration = 4;
   repeated TableCF table_cfs = 5;
   repeated bytes namespaces = 6;
+  optional int64 bandwidth = 7;
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/23335962/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
index c35617b..d7e2b6f 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
@@ -19037,7 +19037,7 @@ public final class ClientProtos {
      *
      * <pre>
      * This field is filled in if the client has requested that scan metrics be tracked.
-     * The metrics tracked here are sent back to the client to be tracked together with 
+     * The metrics tracked here are sent back to the client to be tracked together with
      * the existing client side metrics.
      * </pre>
      */
@@ -19047,7 +19047,7 @@ public final class ClientProtos {
      *
      * <pre>
      * This field is filled in if the client has requested that scan metrics be tracked.
-     * The metrics tracked here are sent back to the client to be tracked together with 
+     * The metrics tracked here are sent back to the client to be tracked together with
      * the existing client side metrics.
      * </pre>
      */
@@ -19057,7 +19057,7 @@ public final class ClientProtos {
      *
      * <pre>
      * This field is filled in if the client has requested that scan metrics be tracked.
-     * The metrics tracked here are sent back to the client to be tracked together with 
+     * The metrics tracked here are sent back to the client to be tracked together with
      * the existing client side metrics.
      * </pre>
      */
@@ -19591,7 +19591,7 @@ public final class ClientProtos {
      *
      * <pre>
      * This field is filled in if the client has requested that scan metrics be tracked.
-     * The metrics tracked here are sent back to the client to be tracked together with 
+     * The metrics tracked here are sent back to the client to be tracked together with
      * the existing client side metrics.
      * </pre>
      */
@@ -19603,7 +19603,7 @@ public final class ClientProtos {
      *
      * <pre>
      * This field is filled in if the client has requested that scan metrics be tracked.
-     * The metrics tracked here are sent back to the client to be tracked together with 
+     * The metrics tracked here are sent back to the client to be tracked together with
      * the existing client side metrics.
      * </pre>
      */
@@ -19615,7 +19615,7 @@ public final class ClientProtos {
      *
      * <pre>
      * This field is filled in if the client has requested that scan metrics be tracked.
-     * The metrics tracked here are sent back to the client to be tracked together with 
+     * The metrics tracked here are sent back to the client to be tracked together with
      * the existing client side metrics.
      * </pre>
      */
@@ -21117,7 +21117,7 @@ public final class ClientProtos {
        *
        * <pre>
        * This field is filled in if the client has requested that scan metrics be tracked.
-       * The metrics tracked here are sent back to the client to be tracked together with 
+       * The metrics tracked here are sent back to the client to be tracked together with
        * the existing client side metrics.
        * </pre>
        */
@@ -21129,7 +21129,7 @@ public final class ClientProtos {
        *
        * <pre>
        * This field is filled in if the client has requested that scan metrics be tracked.
-       * The metrics tracked here are sent back to the client to be tracked together with 
+       * The metrics tracked here are sent back to the client to be tracked together with
        * the existing client side metrics.
        * </pre>
        */
@@ -21145,7 +21145,7 @@ public final class ClientProtos {
        *
        * <pre>
        * This field is filled in if the client has requested that scan metrics be tracked.
-       * The metrics tracked here are sent back to the client to be tracked together with 
+       * The metrics tracked here are sent back to the client to be tracked together with
        * the existing client side metrics.
        * </pre>
        */
@@ -21167,7 +21167,7 @@ public final class ClientProtos {
        *
        * <pre>
        * This field is filled in if the client has requested that scan metrics be tracked.
-       * The metrics tracked here are sent back to the client to be tracked together with 
+       * The metrics tracked here are sent back to the client to be tracked together with
        * the existing client side metrics.
        * </pre>
        */
@@ -21187,7 +21187,7 @@ public final class ClientProtos {
        *
        * <pre>
        * This field is filled in if the client has requested that scan metrics be tracked.
-       * The metrics tracked here are sent back to the client to be tracked together with 
+       * The metrics tracked here are sent back to the client to be tracked together with
        * the existing client side metrics.
        * </pre>
        */
@@ -21212,7 +21212,7 @@ public final class ClientProtos {
        *
        * <pre>
        * This field is filled in if the client has requested that scan metrics be tracked.
-       * The metrics tracked here are sent back to the client to be tracked together with 
+       * The metrics tracked here are sent back to the client to be tracked together with
        * the existing client side metrics.
        * </pre>
        */
@@ -21231,7 +21231,7 @@ public final class ClientProtos {
        *
        * <pre>
        * This field is filled in if the client has requested that scan metrics be tracked.
-       * The metrics tracked here are sent back to the client to be tracked together with 
+       * The metrics tracked here are sent back to the client to be tracked together with
        * the existing client side metrics.
        * </pre>
        */
@@ -21245,7 +21245,7 @@ public final class ClientProtos {
        *
        * <pre>
        * This field is filled in if the client has requested that scan metrics be tracked.
-       * The metrics tracked here are sent back to the client to be tracked together with 
+       * The metrics tracked here are sent back to the client to be tracked together with
        * the existing client side metrics.
        * </pre>
        */
@@ -21261,7 +21261,7 @@ public final class ClientProtos {
        *
        * <pre>
        * This field is filled in if the client has requested that scan metrics be tracked.
-       * The metrics tracked here are sent back to the client to be tracked together with 
+       * The metrics tracked here are sent back to the client to be tracked together with
        * the existing client side metrics.
        * </pre>
        */

http://git-wip-us.apache.org/repos/asf/hbase/blob/23335962/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
index 36cd8b9..0095043 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
@@ -4796,6 +4796,16 @@ public final class ZooKeeperProtos {
      * <code>repeated bytes namespaces = 6;</code>
      */
     com.google.protobuf.ByteString getNamespaces(int index);
+
+    // optional int64 bandwidth = 7;
+    /**
+     * <code>optional int64 bandwidth = 7;</code>
+     */
+    boolean hasBandwidth();
+    /**
+     * <code>optional int64 bandwidth = 7;</code>
+     */
+    long getBandwidth();
   }
   /**
    * Protobuf type {@code hbase.pb.ReplicationPeer}
@@ -4895,6 +4905,11 @@ public final class ZooKeeperProtos {
               namespaces_.add(input.readBytes());
               break;
             }
+            case 56: {
+              bitField0_ |= 0x00000004;
+              bandwidth_ = input.readInt64();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -5179,6 +5194,22 @@ public final class ZooKeeperProtos {
       return namespaces_.get(index);
     }
 
+    // optional int64 bandwidth = 7;
+    public static final int BANDWIDTH_FIELD_NUMBER = 7;
+    private long bandwidth_;
+    /**
+     * <code>optional int64 bandwidth = 7;</code>
+     */
+    public boolean hasBandwidth() {
+      return ((bitField0_ & 0x00000004) == 0x00000004);
+    }
+    /**
+     * <code>optional int64 bandwidth = 7;</code>
+     */
+    public long getBandwidth() {
+      return bandwidth_;
+    }
+
     private void initFields() {
       clusterkey_ = "";
       replicationEndpointImpl_ = "";
@@ -5186,6 +5217,7 @@ public final class ZooKeeperProtos {
       configuration_ = java.util.Collections.emptyList();
       tableCfs_ = java.util.Collections.emptyList();
       namespaces_ = java.util.Collections.emptyList();
+      bandwidth_ = 0L;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -5239,6 +5271,9 @@ public final class ZooKeeperProtos {
       for (int i = 0; i < namespaces_.size(); i++) {
         output.writeBytes(6, namespaces_.get(i));
       }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        output.writeInt64(7, bandwidth_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -5277,6 +5312,10 @@ public final class ZooKeeperProtos {
         size += dataSize;
         size += 1 * getNamespacesList().size();
       }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(7, bandwidth_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -5318,6 +5357,11 @@ public final class ZooKeeperProtos {
           .equals(other.getTableCfsList());
       result = result && getNamespacesList()
           .equals(other.getNamespacesList());
+      result = result && (hasBandwidth() == other.hasBandwidth());
+      if (hasBandwidth()) {
+        result = result && (getBandwidth()
+            == other.getBandwidth());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -5355,6 +5399,10 @@ public final class ZooKeeperProtos {
         hash = (37 * hash) + NAMESPACES_FIELD_NUMBER;
         hash = (53 * hash) + getNamespacesList().hashCode();
       }
+      if (hasBandwidth()) {
+        hash = (37 * hash) + BANDWIDTH_FIELD_NUMBER;
+        hash = (53 * hash) + hashLong(getBandwidth());
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -5496,6 +5544,8 @@ public final class ZooKeeperProtos {
         }
         namespaces_ = java.util.Collections.emptyList();
         bitField0_ = (bitField0_ & ~0x00000020);
+        bandwidth_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000040);
         return this;
       }
 
@@ -5564,6 +5614,10 @@ public final class ZooKeeperProtos {
           bitField0_ = (bitField0_ & ~0x00000020);
         }
         result.namespaces_ = namespaces_;
+        if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
+          to_bitField0_ |= 0x00000004;
+        }
+        result.bandwidth_ = bandwidth_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -5678,6 +5732,9 @@ public final class ZooKeeperProtos {
           }
           onChanged();
         }
+        if (other.hasBandwidth()) {
+          setBandwidth(other.getBandwidth());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -6697,6 +6754,39 @@ public final class ZooKeeperProtos {
         return this;
       }
 
+      // optional int64 bandwidth = 7;
+      private long bandwidth_ ;
+      /**
+       * <code>optional int64 bandwidth = 7;</code>
+       */
+      public boolean hasBandwidth() {
+        return ((bitField0_ & 0x00000040) == 0x00000040);
+      }
+      /**
+       * <code>optional int64 bandwidth = 7;</code>
+       */
+      public long getBandwidth() {
+        return bandwidth_;
+      }
+      /**
+       * <code>optional int64 bandwidth = 7;</code>
+       */
+      public Builder setBandwidth(long value) {
+        bitField0_ |= 0x00000040;
+        bandwidth_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int64 bandwidth = 7;</code>
+       */
+      public Builder clearBandwidth() {
+        bitField0_ = (bitField0_ & ~0x00000040);
+        bandwidth_ = 0L;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:hbase.pb.ReplicationPeer)
     }
 
@@ -9446,23 +9536,24 @@ public final class ZooKeeperProtos {
       "e:\007ENABLED\"?\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010DISA" +
       "BLED\020\001\022\r\n\tDISABLING\020\002\022\014\n\010ENABLING\020\003\"D\n\007T" +
       "ableCF\022\'\n\ntable_name\030\001 \001(\0132\023.hbase.pb.Ta",
-      "bleName\022\020\n\010families\030\002 \003(\014\"\331\001\n\017Replicatio" +
+      "bleName\022\020\n\010families\030\002 \003(\014\"\354\001\n\017Replicatio" +
       "nPeer\022\022\n\nclusterkey\030\001 \002(\t\022\037\n\027replication" +
       "EndpointImpl\030\002 \001(\t\022&\n\004data\030\003 \003(\0132\030.hbase" +
       ".pb.BytesBytesPair\022/\n\rconfiguration\030\004 \003(" +
       "\0132\030.hbase.pb.NameStringPair\022$\n\ttable_cfs" +
       "\030\005 \003(\0132\021.hbase.pb.TableCF\022\022\n\nnamespaces\030" +
-      "\006 \003(\014\"g\n\020ReplicationState\022/\n\005state\030\001 \002(\016" +
-      "2 .hbase.pb.ReplicationState.State\"\"\n\005St" +
-      "ate\022\013\n\007ENABLED\020\000\022\014\n\010DISABLED\020\001\"+\n\027Replic" +
-      "ationHLogPosition\022\020\n\010position\030\001 \002(\003\"\252\001\n\t",
-      "TableLock\022\'\n\ntable_name\030\001 \001(\0132\023.hbase.pb" +
-      ".TableName\022(\n\nlock_owner\030\002 \001(\0132\024.hbase.p" +
-      "b.ServerName\022\021\n\tthread_id\030\003 \001(\003\022\021\n\tis_sh" +
-      "ared\030\004 \001(\010\022\017\n\007purpose\030\005 \001(\t\022\023\n\013create_ti" +
-      "me\030\006 \001(\003\"\036\n\013SwitchState\022\017\n\007enabled\030\001 \001(\010" +
-      "BE\n*org.apache.hadoop.hbase.protobuf.gen" +
-      "eratedB\017ZooKeeperProtosH\001\210\001\001\240\001\001"
+      "\006 \003(\014\022\021\n\tbandwidth\030\007 \001(\003\"g\n\020ReplicationS" +
+      "tate\022/\n\005state\030\001 \002(\0162 .hbase.pb.Replicati" +
+      "onState.State\"\"\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010D" +
+      "ISABLED\020\001\"+\n\027ReplicationHLogPosition\022\020\n\010",
+      "position\030\001 \002(\003\"\252\001\n\tTableLock\022\'\n\ntable_na" +
+      "me\030\001 \001(\0132\023.hbase.pb.TableName\022(\n\nlock_ow" +
+      "ner\030\002 \001(\0132\024.hbase.pb.ServerName\022\021\n\tthrea" +
+      "d_id\030\003 \001(\003\022\021\n\tis_shared\030\004 \001(\010\022\017\n\007purpose" +
+      "\030\005 \001(\t\022\023\n\013create_time\030\006 \001(\003\"\036\n\013SwitchSta" +
+      "te\022\017\n\007enabled\030\001 \001(\010BE\n*org.apache.hadoop" +
+      ".hbase.protobuf.generatedB\017ZooKeeperProt" +
+      "osH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -9510,7 +9601,7 @@ public final class ZooKeeperProtos {
           internal_static_hbase_pb_ReplicationPeer_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_hbase_pb_ReplicationPeer_descriptor,
-              new java.lang.String[] { "Clusterkey", "ReplicationEndpointImpl", "Data", "Configuration", "TableCfs", "Namespaces", });
+              new java.lang.String[] { "Clusterkey", "ReplicationEndpointImpl", "Data", "Configuration", "TableCfs", "Namespaces", "Bandwidth", });
           internal_static_hbase_pb_ReplicationState_descriptor =
             getDescriptor().getMessageTypes().get(7);
           internal_static_hbase_pb_ReplicationState_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/hbase/blob/23335962/hbase-protocol/src/main/protobuf/ZooKeeper.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/ZooKeeper.proto b/hbase-protocol/src/main/protobuf/ZooKeeper.proto
index a0c9d01..6f13e4a 100644
--- a/hbase-protocol/src/main/protobuf/ZooKeeper.proto
+++ b/hbase-protocol/src/main/protobuf/ZooKeeper.proto
@@ -122,6 +122,7 @@ message ReplicationPeer {
   repeated NameStringPair configuration = 4;
   repeated TableCF table_cfs = 5;
   repeated bytes namespaces = 6;
+  optional int64 bandwidth = 7;
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/23335962/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 388efbf..a6fe0fb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
@@ -144,6 +145,8 @@ public class ReplicationSource extends Thread
   private WALEntryFilter walEntryFilter;
   // throttler
   private ReplicationThrottler throttler;
+  private long defaultBandwidth;
+  private long currentBandwidth;
   private ConcurrentHashMap<String, ReplicationSourceWorkerThread> workerThreads =
       new ConcurrentHashMap<String, ReplicationSourceWorkerThread>();
 
@@ -179,8 +182,6 @@ public class ReplicationSource extends Thread
     this.maxRetriesMultiplier =
         this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
     this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
-    long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
-    this.throttler = new ReplicationThrottler((double)bandwidth/10.0);
     this.replicationQueues = replicationQueues;
     this.replicationPeers = replicationPeers;
     this.manager = manager;
@@ -196,6 +197,15 @@ public class ReplicationSource extends Thread
     this.actualPeerId = replicationQueueInfo.getPeerId();
     this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
     this.replicationEndpoint = replicationEndpoint;
+
+    defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
+    currentBandwidth = getCurrentBandwidth();
+    this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
+
+    LOG.info("peerClusterZnode=" + peerClusterZnode + ", ReplicationSource : " + peerId
+        + " inited, replicationQueueSizeCapacity=" + replicationQueueSizeCapacity
+        + ", replicationQueueNbCapacity=" + replicationQueueNbCapacity + ", curerntBandwidth="
+        + this.currentBandwidth);
   }
 
   private void decorateConf() {
@@ -494,6 +504,13 @@ public class ReplicationSource extends Thread
     return this.metrics;
   }
 
+  private long getCurrentBandwidth() {
+    ReplicationPeer replicationPeer = this.replicationPeers.getConnectedPeer(peerId);
+    long peerBandwidth = replicationPeer != null ? replicationPeer.getPeerBandwidth() : 0;
+    // user can set peer bandwidth to 0 to use default bandwidth
+    return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth;
+  }
+
   public class ReplicationSourceWorkerThread extends Thread {
     ReplicationSource source;
     String walGroupId;
@@ -1087,6 +1104,16 @@ public class ReplicationSource extends Thread
       return distinctRowKeys + totalHFileEntries;
     }
 
+    private void checkBandwidthChangeAndResetThrottler() {
+      long peerBandwidth = getCurrentBandwidth();
+      if (peerBandwidth != currentBandwidth) {
+        currentBandwidth = peerBandwidth;
+        throttler.setBandwidth((double) currentBandwidth / 10.0);
+        LOG.info("ReplicationSource : " + peerId
+            + " bandwidth throttling changed, currentBandWidth=" + currentBandwidth);
+      }
+    }
+
     /**
      * Do the shipping logic
      * @param currentWALisBeingWrittenTo was the current WAL being (seemingly)
@@ -1101,6 +1128,7 @@ public class ReplicationSource extends Thread
       }
       while (isWorkerActive()) {
         try {
+          checkBandwidthChangeAndResetThrottler();
           if (throttler.isEnabled()) {
             long sleepTicks = throttler.getNextSleepInterval(currentSize);
             if (sleepTicks > 0) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/23335962/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationThrottler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationThrottler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationThrottler.java
index c756576..8da9352 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationThrottler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationThrottler.java
@@ -27,8 +27,8 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
  */
 @InterfaceAudience.Private
 public class ReplicationThrottler {
-  private final boolean enabled;
-  private final double bandwidth;
+  private boolean enabled;
+  private double bandwidth;
   private long cyclePushSize;
   private long cycleStartTick;
 
@@ -118,4 +118,9 @@ public class ReplicationThrottler {
       this.cycleStartTick = EnvironmentEdgeManager.currentTime();
     }
   }
+
+  public void setBandwidth(double bandwidth) {
+    this.bandwidth = bandwidth;
+    this.enabled = this.bandwidth > 0;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/23335962/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
index c0d18dd..7363fb9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
@@ -469,4 +469,21 @@ public class TestReplicationAdmin {
 
     admin.removePeer(ID_ONE);
   }
+
+  @Test
+  public void testPeerBandwidth() throws ReplicationException {
+    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
+    rpc.setClusterKey(KEY_ONE);
+    admin.addPeer(ID_ONE, rpc);
+    admin.peerAdded(ID_ONE);
+
+    rpc = admin.getPeerConfig(ID_ONE);
+    assertEquals(0, rpc.getBandwidth());
+
+    rpc.setBandwidth(2097152);
+    admin.updatePeerConfig(ID_ONE, rpc);
+
+    assertEquals(2097152, admin.getPeerConfig(ID_ONE).getBandwidth());
+    admin.removePeer(ID_ONE);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/23335962/hbase-shell/src/main/ruby/hbase/replication_admin.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/hbase/replication_admin.rb b/hbase-shell/src/main/ruby/hbase/replication_admin.rb
index 8aa158b..5fd23d3 100644
--- a/hbase-shell/src/main/ruby/hbase/replication_admin.rb
+++ b/hbase-shell/src/main/ruby/hbase/replication_admin.rb
@@ -252,6 +252,15 @@ module Hbase
       end
     end
 
+    # Set new bandwidth config for the specified peer
+    def set_peer_bandwidth(id, bandwidth)
+      rpc = get_peer_config(id)
+      unless rpc.nil?
+        rpc.setBandwidth(bandwidth)
+        @replication_admin.updatePeerConfig(id, rpc)
+      end
+    end
+
     #----------------------------------------------------------------------------------------------
     # Enables a table's replication switch
     def enable_tablerep(table_name)

http://git-wip-us.apache.org/repos/asf/hbase/blob/23335962/hbase-shell/src/main/ruby/shell.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell.rb b/hbase-shell/src/main/ruby/shell.rb
index 02f8191..4b111f1 100644
--- a/hbase-shell/src/main/ruby/shell.rb
+++ b/hbase-shell/src/main/ruby/shell.rb
@@ -375,6 +375,7 @@ Shell.load_command_group(
     remove_peer_namespaces
     show_peer_tableCFs
     set_peer_tableCFs
+    set_peer_bandwidth
     list_replicated_tables
     append_peer_tableCFs
     remove_peer_tableCFs

http://git-wip-us.apache.org/repos/asf/hbase/blob/23335962/hbase-shell/src/main/ruby/shell/commands/list_peers.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell/commands/list_peers.rb b/hbase-shell/src/main/ruby/shell/commands/list_peers.rb
index ed6b575..7d53158 100644
--- a/hbase-shell/src/main/ruby/shell/commands/list_peers.rb
+++ b/hbase-shell/src/main/ruby/shell/commands/list_peers.rb
@@ -33,14 +33,15 @@ EOF
         peers = replication_admin.list_peers
 
         formatter.header(["PEER_ID", "CLUSTER_KEY", "ENDPOINT_CLASSNAME",
-          "STATE", "NAMESPACES", "TABLE_CFS"])
+          "STATE", "NAMESPACES", "TABLE_CFS", "BANDWIDTH"])
 
         peers.entrySet().each do |e|
           state = replication_admin.get_peer_state(e.key)
           namespaces = replication_admin.show_peer_namespaces(e.value)
           tableCFs = replication_admin.show_peer_tableCFs(e.key)
           formatter.row([ e.key, e.value.getClusterKey,
-            e.value.getReplicationEndpointImpl, state, namespaces, tableCFs ])
+            e.value.getReplicationEndpointImpl, state, namespaces, tableCFs,
+            e.value.getBandwidth ])
         end
 
         formatter.footer()

http://git-wip-us.apache.org/repos/asf/hbase/blob/23335962/hbase-shell/src/main/ruby/shell/commands/set_peer_bandwidth.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell/commands/set_peer_bandwidth.rb b/hbase-shell/src/main/ruby/shell/commands/set_peer_bandwidth.rb
new file mode 100644
index 0000000..d9495af
--- /dev/null
+++ b/hbase-shell/src/main/ruby/shell/commands/set_peer_bandwidth.rb
@@ -0,0 +1,42 @@
+#
+# Copyright The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+module Shell
+  module Commands
+    class SetPeerBandwidth< Command
+      def help
+        return <<-EOF
+Set the replication source per node bandwidth for the specified peer.
+Examples:
+
+  # set bandwidth=2MB per regionserver for a peer
+  hbase> set_peer_bandwidth '1', 2097152
+  # unset bandwidth for a peer to use the default bandwidth configured in server-side
+  hbase> set_peer_bandwidth '1'
+
+EOF
+      end
+
+      def command(id, bandwidth = 0)
+        replication_admin.set_peer_bandwidth(id, bandwidth)
+      end
+    end
+  end
+end

http://git-wip-us.apache.org/repos/asf/hbase/blob/23335962/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
index 11ff603..cd1fe35 100644
--- a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
+++ b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
@@ -371,6 +371,24 @@ module Hbase
       command(:remove_peer, @peer_id)
     end
 
+    define_test "set_peer_bandwidth: works with peer bandwidth upper limit" do
+      cluster_key = "localhost:2181:/hbase-test"
+      args = { CLUSTER_KEY => cluster_key }
+      command(:add_peer, @peer_id, args)
+      # Normally the ReplicationSourceManager will call ReplicationPeer#peer_added
+      # but here we have to do it ourselves
+      replication_admin.peer_added(@peer_id)
+
+      peer_config = command(:get_peer_config, @peer_id)
+      assert_equal(0, peer_config.get_bandwidth)
+      command(:set_peer_bandwidth, @peer_id, 2097152)
+      peer_config = command(:get_peer_config, @peer_id)
+      assert_equal(2097152, peer_config.get_bandwidth)
+
+      #cleanup
+      command(:remove_peer, @peer_id)
+    end
+
     define_test "get_peer_config: works with simple clusterKey peer" do
       cluster_key = "localhost:2181:/hbase-test"
       args = { CLUSTER_KEY => cluster_key }