You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2016/12/13 12:20:32 UTC
hbase git commit: HBASE-17296 Provide per peer throttling for
replication (Guanghao Zhang)
Repository: hbase
Updated Branches:
refs/heads/master adb319f5c -> 233359627
HBASE-17296 Provide per peer throttling for replication (Guanghao Zhang)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/23335962
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/23335962
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/23335962
Branch: refs/heads/master
Commit: 2333596279b63c045e5fd5be09b2fce8ce5c9980
Parents: adb319f
Author: tedyu <yu...@gmail.com>
Authored: Tue Dec 13 04:20:20 2016 -0800
Committer: tedyu <yu...@gmail.com>
Committed: Tue Dec 13 04:20:20 2016 -0800
----------------------------------------------------------------------
.../replication/ReplicationSerDeHelper.java | 4 +
.../hbase/replication/ReplicationPeer.java | 6 +
.../replication/ReplicationPeerConfig.java | 13 ++-
.../replication/ReplicationPeerZKImpl.java | 5 +
.../replication/ReplicationPeersZKImpl.java | 1 +
.../shaded/protobuf/generated/ClientProtos.java | 30 ++---
.../protobuf/generated/ZooKeeperProtos.java | 115 +++++++++++++++---
.../src/main/protobuf/ZooKeeper.proto | 1 +
.../hbase/protobuf/generated/ClientProtos.java | 30 ++---
.../protobuf/generated/ZooKeeperProtos.java | 117 ++++++++++++++++---
.../src/main/protobuf/ZooKeeper.proto | 1 +
.../regionserver/ReplicationSource.java | 32 ++++-
.../regionserver/ReplicationThrottler.java | 9 +-
.../replication/TestReplicationAdmin.java | 17 +++
.../src/main/ruby/hbase/replication_admin.rb | 9 ++
hbase-shell/src/main/ruby/shell.rb | 1 +
.../src/main/ruby/shell/commands/list_peers.rb | 5 +-
.../ruby/shell/commands/set_peer_bandwidth.rb | 42 +++++++
.../test/ruby/hbase/replication_admin_test.rb | 18 +++
19 files changed, 393 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/23335962/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java
index 6ac4417..dd83fb1 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java
@@ -287,6 +287,9 @@ public final class ReplicationSerDeHelper {
}
peerConfig.setNamespaces(namespaces);
}
+ if (peer.hasBandwidth()) {
+ peerConfig.setBandwidth(peer.getBandwidth());
+ }
return peerConfig;
}
@@ -326,6 +329,7 @@ public final class ReplicationSerDeHelper {
}
}
+ builder.setBandwidth(peerConfig.getBandwidth());
return builder.build();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/23335962/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
index bd2b700..4f18048 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
@@ -78,6 +78,12 @@ public interface ReplicationPeer {
*/
public Set<String> getNamespaces();
+ /**
+ * Get the per node bandwidth upper limit for this peer
+ * @return the bandwidth upper limit
+ */
+ public long getPeerBandwidth();
+
void trackPeerConfigChanges(ReplicationPeerConfigListener listener);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/23335962/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
index 64f6d1b..790f021 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
@@ -43,6 +43,7 @@ public class ReplicationPeerConfig {
private final Map<String, String> configuration;
private Map<TableName, ? extends Collection<String>> tableCFsMap = null;
private Set<String> namespaces = null;
+ private long bandwidth = 0;
public ReplicationPeerConfig() {
this.peerData = new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);
@@ -102,6 +103,15 @@ public class ReplicationPeerConfig {
return this;
}
+ public long getBandwidth() {
+ return this.bandwidth;
+ }
+
+ public ReplicationPeerConfig setBandwidth(long bandwidth) {
+ this.bandwidth = bandwidth;
+ return this;
+ }
+
@Override
public String toString() {
StringBuilder builder = new StringBuilder("clusterKey=").append(clusterKey).append(",");
@@ -110,8 +120,9 @@ public class ReplicationPeerConfig {
builder.append("namespaces=").append(namespaces.toString()).append(",");
}
if (tableCFsMap != null) {
- builder.append("tableCFs=").append(tableCFsMap.toString());
+ builder.append("tableCFs=").append(tableCFsMap.toString()).append(",");
}
+ builder.append("bandwidth=").append(bandwidth);
return builder.toString();
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/23335962/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
index 5302b1b..c58bd71 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
@@ -174,6 +174,11 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase
}
@Override
+ public long getPeerBandwidth() {
+ return this.peerConfig.getBandwidth();
+ }
+
+ @Override
public void trackPeerConfigChanges(ReplicationPeerConfigListener listener) {
if (this.peerConfigTracker != null){
this.peerConfigTracker.setListener(listener);
http://git-wip-us.apache.org/repos/asf/hbase/blob/23335962/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
index d12c4e9..9a617a7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
@@ -367,6 +367,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
existingConfig.getPeerData().putAll(newConfig.getPeerData());
existingConfig.setTableCFsMap(newConfig.getTableCFsMap());
existingConfig.setNamespaces(newConfig.getNamespaces());
+ existingConfig.setBandwidth(newConfig.getBandwidth());
try {
ZKUtil.setData(this.zookeeper, getPeerNode(id),
http://git-wip-us.apache.org/repos/asf/hbase/blob/23335962/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ClientProtos.java
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ClientProtos.java b/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ClientProtos.java
index e9458df..eab62eb 100644
--- a/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ClientProtos.java
+++ b/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ClientProtos.java
@@ -19372,7 +19372,7 @@ public final class ClientProtos {
/**
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
- * The metrics tracked here are sent back to the client to be tracked together with
+ * The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*
@@ -19382,7 +19382,7 @@ public final class ClientProtos {
/**
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
- * The metrics tracked here are sent back to the client to be tracked together with
+ * The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*
@@ -19392,7 +19392,7 @@ public final class ClientProtos {
/**
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
- * The metrics tracked here are sent back to the client to be tracked together with
+ * The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*
@@ -19902,7 +19902,7 @@ public final class ClientProtos {
/**
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
- * The metrics tracked here are sent back to the client to be tracked together with
+ * The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*
@@ -19914,7 +19914,7 @@ public final class ClientProtos {
/**
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
- * The metrics tracked here are sent back to the client to be tracked together with
+ * The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*
@@ -19926,7 +19926,7 @@ public final class ClientProtos {
/**
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
- * The metrics tracked here are sent back to the client to be tracked together with
+ * The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*
@@ -21436,7 +21436,7 @@ public final class ClientProtos {
/**
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
- * The metrics tracked here are sent back to the client to be tracked together with
+ * The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*
@@ -21448,7 +21448,7 @@ public final class ClientProtos {
/**
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
- * The metrics tracked here are sent back to the client to be tracked together with
+ * The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*
@@ -21464,7 +21464,7 @@ public final class ClientProtos {
/**
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
- * The metrics tracked here are sent back to the client to be tracked together with
+ * The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*
@@ -21486,7 +21486,7 @@ public final class ClientProtos {
/**
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
- * The metrics tracked here are sent back to the client to be tracked together with
+ * The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*
@@ -21506,7 +21506,7 @@ public final class ClientProtos {
/**
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
- * The metrics tracked here are sent back to the client to be tracked together with
+ * The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*
@@ -21532,7 +21532,7 @@ public final class ClientProtos {
/**
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
- * The metrics tracked here are sent back to the client to be tracked together with
+ * The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*
@@ -21551,7 +21551,7 @@ public final class ClientProtos {
/**
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
- * The metrics tracked here are sent back to the client to be tracked together with
+ * The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*
@@ -21565,7 +21565,7 @@ public final class ClientProtos {
/**
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
- * The metrics tracked here are sent back to the client to be tracked together with
+ * The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*
@@ -21582,7 +21582,7 @@ public final class ClientProtos {
/**
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
- * The metrics tracked here are sent back to the client to be tracked together with
+ * The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*
http://git-wip-us.apache.org/repos/asf/hbase/blob/23335962/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ZooKeeperProtos.java
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ZooKeeperProtos.java b/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ZooKeeperProtos.java
index 6baf845..90ec659 100644
--- a/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ZooKeeperProtos.java
+++ b/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ZooKeeperProtos.java
@@ -4994,6 +4994,15 @@ public final class ZooKeeperProtos {
* <code>repeated bytes namespaces = 6;</code>
*/
org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString getNamespaces(int index);
+
+ /**
+ * <code>optional int64 bandwidth = 7;</code>
+ */
+ boolean hasBandwidth();
+ /**
+ * <code>optional int64 bandwidth = 7;</code>
+ */
+ long getBandwidth();
}
/**
* <pre>
@@ -5018,6 +5027,7 @@ public final class ZooKeeperProtos {
configuration_ = java.util.Collections.emptyList();
tableCfs_ = java.util.Collections.emptyList();
namespaces_ = java.util.Collections.emptyList();
+ bandwidth_ = 0L;
}
@java.lang.Override
@@ -5095,6 +5105,11 @@ public final class ZooKeeperProtos {
namespaces_.add(input.readBytes());
break;
}
+ case 56: {
+ bitField0_ |= 0x00000004;
+ bandwidth_ = input.readInt64();
+ break;
+ }
}
}
} catch (org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException e) {
@@ -5358,6 +5373,21 @@ public final class ZooKeeperProtos {
return namespaces_.get(index);
}
+ public static final int BANDWIDTH_FIELD_NUMBER = 7;
+ private long bandwidth_;
+ /**
+ * <code>optional int64 bandwidth = 7;</code>
+ */
+ public boolean hasBandwidth() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ /**
+ * <code>optional int64 bandwidth = 7;</code>
+ */
+ public long getBandwidth() {
+ return bandwidth_;
+ }
+
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
byte isInitialized = memoizedIsInitialized;
@@ -5410,6 +5440,9 @@ public final class ZooKeeperProtos {
for (int i = 0; i < namespaces_.size(); i++) {
output.writeBytes(6, namespaces_.get(i));
}
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ output.writeInt64(7, bandwidth_);
+ }
unknownFields.writeTo(output);
}
@@ -5445,6 +5478,10 @@ public final class ZooKeeperProtos {
size += dataSize;
size += 1 * getNamespacesList().size();
}
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ size += org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream
+ .computeInt64Size(7, bandwidth_);
+ }
size += unknownFields.getSerializedSize();
memoizedSize = size;
return size;
@@ -5480,6 +5517,11 @@ public final class ZooKeeperProtos {
.equals(other.getTableCfsList());
result = result && getNamespacesList()
.equals(other.getNamespacesList());
+ result = result && (hasBandwidth() == other.hasBandwidth());
+ if (hasBandwidth()) {
+ result = result && (getBandwidth()
+ == other.getBandwidth());
+ }
result = result && unknownFields.equals(other.unknownFields);
return result;
}
@@ -5515,6 +5557,11 @@ public final class ZooKeeperProtos {
hash = (37 * hash) + NAMESPACES_FIELD_NUMBER;
hash = (53 * hash) + getNamespacesList().hashCode();
}
+ if (hasBandwidth()) {
+ hash = (37 * hash) + BANDWIDTH_FIELD_NUMBER;
+ hash = (53 * hash) + org.apache.hadoop.hbase.shaded.com.google.protobuf.Internal.hashLong(
+ getBandwidth());
+ }
hash = (29 * hash) + unknownFields.hashCode();
memoizedHashCode = hash;
return hash;
@@ -5665,6 +5712,8 @@ public final class ZooKeeperProtos {
}
namespaces_ = java.util.Collections.emptyList();
bitField0_ = (bitField0_ & ~0x00000020);
+ bandwidth_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00000040);
return this;
}
@@ -5729,6 +5778,10 @@ public final class ZooKeeperProtos {
bitField0_ = (bitField0_ & ~0x00000020);
}
result.namespaces_ = namespaces_;
+ if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
+ to_bitField0_ |= 0x00000004;
+ }
+ result.bandwidth_ = bandwidth_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -5869,6 +5922,9 @@ public final class ZooKeeperProtos {
}
onChanged();
}
+ if (other.hasBandwidth()) {
+ setBandwidth(other.getBandwidth());
+ }
this.mergeUnknownFields(other.unknownFields);
onChanged();
return this;
@@ -6888,6 +6944,38 @@ public final class ZooKeeperProtos {
onChanged();
return this;
}
+
+ private long bandwidth_ ;
+ /**
+ * <code>optional int64 bandwidth = 7;</code>
+ */
+ public boolean hasBandwidth() {
+ return ((bitField0_ & 0x00000040) == 0x00000040);
+ }
+ /**
+ * <code>optional int64 bandwidth = 7;</code>
+ */
+ public long getBandwidth() {
+ return bandwidth_;
+ }
+ /**
+ * <code>optional int64 bandwidth = 7;</code>
+ */
+ public Builder setBandwidth(long value) {
+ bitField0_ |= 0x00000040;
+ bandwidth_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional int64 bandwidth = 7;</code>
+ */
+ public Builder clearBandwidth() {
+ bitField0_ = (bitField0_ & ~0x00000040);
+ bandwidth_ = 0L;
+ onChanged();
+ return this;
+ }
public final Builder setUnknownFields(
final org.apache.hadoop.hbase.shaded.com.google.protobuf.UnknownFieldSet unknownFields) {
return super.setUnknownFields(unknownFields);
@@ -9803,23 +9891,24 @@ public final class ZooKeeperProtos {
"e:\007ENABLED\"?\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010DISA" +
"BLED\020\001\022\r\n\tDISABLING\020\002\022\014\n\010ENABLING\020\003\"D\n\007T" +
"ableCF\022\'\n\ntable_name\030\001 \001(\0132\023.hbase.pb.Ta",
- "bleName\022\020\n\010families\030\002 \003(\014\"\331\001\n\017Replicatio" +
+ "bleName\022\020\n\010families\030\002 \003(\014\"\354\001\n\017Replicatio" +
"nPeer\022\022\n\nclusterkey\030\001 \002(\t\022\037\n\027replication" +
"EndpointImpl\030\002 \001(\t\022&\n\004data\030\003 \003(\0132\030.hbase" +
".pb.BytesBytesPair\022/\n\rconfiguration\030\004 \003(" +
"\0132\030.hbase.pb.NameStringPair\022$\n\ttable_cfs" +
"\030\005 \003(\0132\021.hbase.pb.TableCF\022\022\n\nnamespaces\030" +
- "\006 \003(\014\"g\n\020ReplicationState\022/\n\005state\030\001 \002(\016" +
- "2 .hbase.pb.ReplicationState.State\"\"\n\005St" +
- "ate\022\013\n\007ENABLED\020\000\022\014\n\010DISABLED\020\001\"+\n\027Replic" +
- "ationHLogPosition\022\020\n\010position\030\001 \002(\003\"\252\001\n\t",
- "TableLock\022\'\n\ntable_name\030\001 \001(\0132\023.hbase.pb" +
- ".TableName\022(\n\nlock_owner\030\002 \001(\0132\024.hbase.p" +
- "b.ServerName\022\021\n\tthread_id\030\003 \001(\003\022\021\n\tis_sh" +
- "ared\030\004 \001(\010\022\017\n\007purpose\030\005 \001(\t\022\023\n\013create_ti" +
- "me\030\006 \001(\003\"\036\n\013SwitchState\022\017\n\007enabled\030\001 \001(\010" +
- "BL\n1org.apache.hadoop.hbase.shaded.proto" +
- "buf.generatedB\017ZooKeeperProtosH\001\210\001\001\240\001\001"
+ "\006 \003(\014\022\021\n\tbandwidth\030\007 \001(\003\"g\n\020ReplicationS" +
+ "tate\022/\n\005state\030\001 \002(\0162 .hbase.pb.Replicati" +
+ "onState.State\"\"\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010D" +
+ "ISABLED\020\001\"+\n\027ReplicationHLogPosition\022\020\n\010",
+ "position\030\001 \002(\003\"\252\001\n\tTableLock\022\'\n\ntable_na" +
+ "me\030\001 \001(\0132\023.hbase.pb.TableName\022(\n\nlock_ow" +
+ "ner\030\002 \001(\0132\024.hbase.pb.ServerName\022\021\n\tthrea" +
+ "d_id\030\003 \001(\003\022\021\n\tis_shared\030\004 \001(\010\022\017\n\007purpose" +
+ "\030\005 \001(\t\022\023\n\013create_time\030\006 \001(\003\"\036\n\013SwitchSta" +
+ "te\022\017\n\007enabled\030\001 \001(\010BL\n1org.apache.hadoop" +
+ ".hbase.shaded.protobuf.generatedB\017ZooKee" +
+ "perProtosH\001\210\001\001\240\001\001"
};
org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.FileDescriptor. InternalDescriptorAssigner() {
@@ -9876,7 +9965,7 @@ public final class ZooKeeperProtos {
internal_static_hbase_pb_ReplicationPeer_fieldAccessorTable = new
org.apache.hadoop.hbase.shaded.com.google.protobuf.GeneratedMessageV3.FieldAccessorTable(
internal_static_hbase_pb_ReplicationPeer_descriptor,
- new java.lang.String[] { "Clusterkey", "ReplicationEndpointImpl", "Data", "Configuration", "TableCfs", "Namespaces", });
+ new java.lang.String[] { "Clusterkey", "ReplicationEndpointImpl", "Data", "Configuration", "TableCfs", "Namespaces", "Bandwidth", });
internal_static_hbase_pb_ReplicationState_descriptor =
getDescriptor().getMessageTypes().get(7);
internal_static_hbase_pb_ReplicationState_fieldAccessorTable = new
http://git-wip-us.apache.org/repos/asf/hbase/blob/23335962/hbase-protocol-shaded/src/main/protobuf/ZooKeeper.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/protobuf/ZooKeeper.proto b/hbase-protocol-shaded/src/main/protobuf/ZooKeeper.proto
index c66639b..323862c 100644
--- a/hbase-protocol-shaded/src/main/protobuf/ZooKeeper.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/ZooKeeper.proto
@@ -122,6 +122,7 @@ message ReplicationPeer {
repeated NameStringPair configuration = 4;
repeated TableCF table_cfs = 5;
repeated bytes namespaces = 6;
+ optional int64 bandwidth = 7;
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/23335962/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
index c35617b..d7e2b6f 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
@@ -19037,7 +19037,7 @@ public final class ClientProtos {
*
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
- * The metrics tracked here are sent back to the client to be tracked together with
+ * The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*/
@@ -19047,7 +19047,7 @@ public final class ClientProtos {
*
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
- * The metrics tracked here are sent back to the client to be tracked together with
+ * The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*/
@@ -19057,7 +19057,7 @@ public final class ClientProtos {
*
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
- * The metrics tracked here are sent back to the client to be tracked together with
+ * The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*/
@@ -19591,7 +19591,7 @@ public final class ClientProtos {
*
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
- * The metrics tracked here are sent back to the client to be tracked together with
+ * The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*/
@@ -19603,7 +19603,7 @@ public final class ClientProtos {
*
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
- * The metrics tracked here are sent back to the client to be tracked together with
+ * The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*/
@@ -19615,7 +19615,7 @@ public final class ClientProtos {
*
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
- * The metrics tracked here are sent back to the client to be tracked together with
+ * The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*/
@@ -21117,7 +21117,7 @@ public final class ClientProtos {
*
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
- * The metrics tracked here are sent back to the client to be tracked together with
+ * The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*/
@@ -21129,7 +21129,7 @@ public final class ClientProtos {
*
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
- * The metrics tracked here are sent back to the client to be tracked together with
+ * The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*/
@@ -21145,7 +21145,7 @@ public final class ClientProtos {
*
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
- * The metrics tracked here are sent back to the client to be tracked together with
+ * The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*/
@@ -21167,7 +21167,7 @@ public final class ClientProtos {
*
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
- * The metrics tracked here are sent back to the client to be tracked together with
+ * The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*/
@@ -21187,7 +21187,7 @@ public final class ClientProtos {
*
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
- * The metrics tracked here are sent back to the client to be tracked together with
+ * The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*/
@@ -21212,7 +21212,7 @@ public final class ClientProtos {
*
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
- * The metrics tracked here are sent back to the client to be tracked together with
+ * The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*/
@@ -21231,7 +21231,7 @@ public final class ClientProtos {
*
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
- * The metrics tracked here are sent back to the client to be tracked together with
+ * The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*/
@@ -21245,7 +21245,7 @@ public final class ClientProtos {
*
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
- * The metrics tracked here are sent back to the client to be tracked together with
+ * The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*/
@@ -21261,7 +21261,7 @@ public final class ClientProtos {
*
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
- * The metrics tracked here are sent back to the client to be tracked together with
+ * The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/23335962/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
index 36cd8b9..0095043 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
@@ -4796,6 +4796,16 @@ public final class ZooKeeperProtos {
* <code>repeated bytes namespaces = 6;</code>
*/
com.google.protobuf.ByteString getNamespaces(int index);
+
+ // optional int64 bandwidth = 7;
+ /**
+ * <code>optional int64 bandwidth = 7;</code>
+ */
+ boolean hasBandwidth();
+ /**
+ * <code>optional int64 bandwidth = 7;</code>
+ */
+ long getBandwidth();
}
/**
* Protobuf type {@code hbase.pb.ReplicationPeer}
@@ -4895,6 +4905,11 @@ public final class ZooKeeperProtos {
namespaces_.add(input.readBytes());
break;
}
+ case 56: {
+ bitField0_ |= 0x00000004;
+ bandwidth_ = input.readInt64();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -5179,6 +5194,22 @@ public final class ZooKeeperProtos {
return namespaces_.get(index);
}
+ // optional int64 bandwidth = 7;
+ public static final int BANDWIDTH_FIELD_NUMBER = 7;
+ private long bandwidth_;
+ /**
+ * <code>optional int64 bandwidth = 7;</code>
+ */
+ public boolean hasBandwidth() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ /**
+ * <code>optional int64 bandwidth = 7;</code>
+ */
+ public long getBandwidth() {
+ return bandwidth_;
+ }
+
private void initFields() {
clusterkey_ = "";
replicationEndpointImpl_ = "";
@@ -5186,6 +5217,7 @@ public final class ZooKeeperProtos {
configuration_ = java.util.Collections.emptyList();
tableCfs_ = java.util.Collections.emptyList();
namespaces_ = java.util.Collections.emptyList();
+ bandwidth_ = 0L;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -5239,6 +5271,9 @@ public final class ZooKeeperProtos {
for (int i = 0; i < namespaces_.size(); i++) {
output.writeBytes(6, namespaces_.get(i));
}
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ output.writeInt64(7, bandwidth_);
+ }
getUnknownFields().writeTo(output);
}
@@ -5277,6 +5312,10 @@ public final class ZooKeeperProtos {
size += dataSize;
size += 1 * getNamespacesList().size();
}
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeInt64Size(7, bandwidth_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -5318,6 +5357,11 @@ public final class ZooKeeperProtos {
.equals(other.getTableCfsList());
result = result && getNamespacesList()
.equals(other.getNamespacesList());
+ result = result && (hasBandwidth() == other.hasBandwidth());
+ if (hasBandwidth()) {
+ result = result && (getBandwidth()
+ == other.getBandwidth());
+ }
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@@ -5355,6 +5399,10 @@ public final class ZooKeeperProtos {
hash = (37 * hash) + NAMESPACES_FIELD_NUMBER;
hash = (53 * hash) + getNamespacesList().hashCode();
}
+ if (hasBandwidth()) {
+ hash = (37 * hash) + BANDWIDTH_FIELD_NUMBER;
+ hash = (53 * hash) + hashLong(getBandwidth());
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -5496,6 +5544,8 @@ public final class ZooKeeperProtos {
}
namespaces_ = java.util.Collections.emptyList();
bitField0_ = (bitField0_ & ~0x00000020);
+ bandwidth_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00000040);
return this;
}
@@ -5564,6 +5614,10 @@ public final class ZooKeeperProtos {
bitField0_ = (bitField0_ & ~0x00000020);
}
result.namespaces_ = namespaces_;
+ if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
+ to_bitField0_ |= 0x00000004;
+ }
+ result.bandwidth_ = bandwidth_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -5678,6 +5732,9 @@ public final class ZooKeeperProtos {
}
onChanged();
}
+ if (other.hasBandwidth()) {
+ setBandwidth(other.getBandwidth());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -6697,6 +6754,39 @@ public final class ZooKeeperProtos {
return this;
}
+ // optional int64 bandwidth = 7;
+ private long bandwidth_ ;
+ /**
+ * <code>optional int64 bandwidth = 7;</code>
+ */
+ public boolean hasBandwidth() {
+ return ((bitField0_ & 0x00000040) == 0x00000040);
+ }
+ /**
+ * <code>optional int64 bandwidth = 7;</code>
+ */
+ public long getBandwidth() {
+ return bandwidth_;
+ }
+ /**
+ * <code>optional int64 bandwidth = 7;</code>
+ */
+ public Builder setBandwidth(long value) {
+ bitField0_ |= 0x00000040;
+ bandwidth_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional int64 bandwidth = 7;</code>
+ */
+ public Builder clearBandwidth() {
+ bitField0_ = (bitField0_ & ~0x00000040);
+ bandwidth_ = 0L;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:hbase.pb.ReplicationPeer)
}
@@ -9446,23 +9536,24 @@ public final class ZooKeeperProtos {
"e:\007ENABLED\"?\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010DISA" +
"BLED\020\001\022\r\n\tDISABLING\020\002\022\014\n\010ENABLING\020\003\"D\n\007T" +
"ableCF\022\'\n\ntable_name\030\001 \001(\0132\023.hbase.pb.Ta",
- "bleName\022\020\n\010families\030\002 \003(\014\"\331\001\n\017Replicatio" +
+ "bleName\022\020\n\010families\030\002 \003(\014\"\354\001\n\017Replicatio" +
"nPeer\022\022\n\nclusterkey\030\001 \002(\t\022\037\n\027replication" +
"EndpointImpl\030\002 \001(\t\022&\n\004data\030\003 \003(\0132\030.hbase" +
".pb.BytesBytesPair\022/\n\rconfiguration\030\004 \003(" +
"\0132\030.hbase.pb.NameStringPair\022$\n\ttable_cfs" +
"\030\005 \003(\0132\021.hbase.pb.TableCF\022\022\n\nnamespaces\030" +
- "\006 \003(\014\"g\n\020ReplicationState\022/\n\005state\030\001 \002(\016" +
- "2 .hbase.pb.ReplicationState.State\"\"\n\005St" +
- "ate\022\013\n\007ENABLED\020\000\022\014\n\010DISABLED\020\001\"+\n\027Replic" +
- "ationHLogPosition\022\020\n\010position\030\001 \002(\003\"\252\001\n\t",
- "TableLock\022\'\n\ntable_name\030\001 \001(\0132\023.hbase.pb" +
- ".TableName\022(\n\nlock_owner\030\002 \001(\0132\024.hbase.p" +
- "b.ServerName\022\021\n\tthread_id\030\003 \001(\003\022\021\n\tis_sh" +
- "ared\030\004 \001(\010\022\017\n\007purpose\030\005 \001(\t\022\023\n\013create_ti" +
- "me\030\006 \001(\003\"\036\n\013SwitchState\022\017\n\007enabled\030\001 \001(\010" +
- "BE\n*org.apache.hadoop.hbase.protobuf.gen" +
- "eratedB\017ZooKeeperProtosH\001\210\001\001\240\001\001"
+ "\006 \003(\014\022\021\n\tbandwidth\030\007 \001(\003\"g\n\020ReplicationS" +
+ "tate\022/\n\005state\030\001 \002(\0162 .hbase.pb.Replicati" +
+ "onState.State\"\"\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010D" +
+ "ISABLED\020\001\"+\n\027ReplicationHLogPosition\022\020\n\010",
+ "position\030\001 \002(\003\"\252\001\n\tTableLock\022\'\n\ntable_na" +
+ "me\030\001 \001(\0132\023.hbase.pb.TableName\022(\n\nlock_ow" +
+ "ner\030\002 \001(\0132\024.hbase.pb.ServerName\022\021\n\tthrea" +
+ "d_id\030\003 \001(\003\022\021\n\tis_shared\030\004 \001(\010\022\017\n\007purpose" +
+ "\030\005 \001(\t\022\023\n\013create_time\030\006 \001(\003\"\036\n\013SwitchSta" +
+ "te\022\017\n\007enabled\030\001 \001(\010BE\n*org.apache.hadoop" +
+ ".hbase.protobuf.generatedB\017ZooKeeperProt" +
+ "osH\001\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -9510,7 +9601,7 @@ public final class ZooKeeperProtos {
internal_static_hbase_pb_ReplicationPeer_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_hbase_pb_ReplicationPeer_descriptor,
- new java.lang.String[] { "Clusterkey", "ReplicationEndpointImpl", "Data", "Configuration", "TableCfs", "Namespaces", });
+ new java.lang.String[] { "Clusterkey", "ReplicationEndpointImpl", "Data", "Configuration", "TableCfs", "Namespaces", "Bandwidth", });
internal_static_hbase_pb_ReplicationState_descriptor =
getDescriptor().getMessageTypes().get(7);
internal_static_hbase_pb_ReplicationState_fieldAccessorTable = new
http://git-wip-us.apache.org/repos/asf/hbase/blob/23335962/hbase-protocol/src/main/protobuf/ZooKeeper.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/ZooKeeper.proto b/hbase-protocol/src/main/protobuf/ZooKeeper.proto
index a0c9d01..6f13e4a 100644
--- a/hbase-protocol/src/main/protobuf/ZooKeeper.proto
+++ b/hbase-protocol/src/main/protobuf/ZooKeeper.proto
@@ -122,6 +122,7 @@ message ReplicationPeer {
repeated NameStringPair configuration = 4;
repeated TableCF table_cfs = 5;
repeated bytes namespaces = 6;
+ optional int64 bandwidth = 7;
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/23335962/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 388efbf..a6fe0fb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
@@ -144,6 +145,8 @@ public class ReplicationSource extends Thread
private WALEntryFilter walEntryFilter;
// throttler
private ReplicationThrottler throttler;
+ private long defaultBandwidth;
+ private long currentBandwidth;
private ConcurrentHashMap<String, ReplicationSourceWorkerThread> workerThreads =
new ConcurrentHashMap<String, ReplicationSourceWorkerThread>();
@@ -179,8 +182,6 @@ public class ReplicationSource extends Thread
this.maxRetriesMultiplier =
this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
- long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
- this.throttler = new ReplicationThrottler((double)bandwidth/10.0);
this.replicationQueues = replicationQueues;
this.replicationPeers = replicationPeers;
this.manager = manager;
@@ -196,6 +197,15 @@ public class ReplicationSource extends Thread
this.actualPeerId = replicationQueueInfo.getPeerId();
this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
this.replicationEndpoint = replicationEndpoint;
+
+ defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
+ currentBandwidth = getCurrentBandwidth();
+ this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
+
+ LOG.info("peerClusterZnode=" + peerClusterZnode + ", ReplicationSource : " + peerId
+ + " inited, replicationQueueSizeCapacity=" + replicationQueueSizeCapacity
+ + ", replicationQueueNbCapacity=" + replicationQueueNbCapacity + ", curerntBandwidth="
+ + this.currentBandwidth);
}
private void decorateConf() {
@@ -494,6 +504,13 @@ public class ReplicationSource extends Thread
return this.metrics;
}
+ private long getCurrentBandwidth() {
+ ReplicationPeer replicationPeer = this.replicationPeers.getConnectedPeer(peerId);
+ long peerBandwidth = replicationPeer != null ? replicationPeer.getPeerBandwidth() : 0;
+ // user can set peer bandwidth to 0 to use default bandwidth
+ return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth;
+ }
+
public class ReplicationSourceWorkerThread extends Thread {
ReplicationSource source;
String walGroupId;
@@ -1087,6 +1104,16 @@ public class ReplicationSource extends Thread
return distinctRowKeys + totalHFileEntries;
}
+ private void checkBandwidthChangeAndResetThrottler() {
+ long peerBandwidth = getCurrentBandwidth();
+ if (peerBandwidth != currentBandwidth) {
+ currentBandwidth = peerBandwidth;
+ throttler.setBandwidth((double) currentBandwidth / 10.0);
+ LOG.info("ReplicationSource : " + peerId
+ + " bandwidth throttling changed, currentBandWidth=" + currentBandwidth);
+ }
+ }
+
/**
* Do the shipping logic
* @param currentWALisBeingWrittenTo was the current WAL being (seemingly)
@@ -1101,6 +1128,7 @@ public class ReplicationSource extends Thread
}
while (isWorkerActive()) {
try {
+ checkBandwidthChangeAndResetThrottler();
if (throttler.isEnabled()) {
long sleepTicks = throttler.getNextSleepInterval(currentSize);
if (sleepTicks > 0) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/23335962/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationThrottler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationThrottler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationThrottler.java
index c756576..8da9352 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationThrottler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationThrottler.java
@@ -27,8 +27,8 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
*/
@InterfaceAudience.Private
public class ReplicationThrottler {
- private final boolean enabled;
- private final double bandwidth;
+ private boolean enabled;
+ private double bandwidth;
private long cyclePushSize;
private long cycleStartTick;
@@ -118,4 +118,9 @@ public class ReplicationThrottler {
this.cycleStartTick = EnvironmentEdgeManager.currentTime();
}
}
+
+ public void setBandwidth(double bandwidth) {
+ this.bandwidth = bandwidth;
+ this.enabled = this.bandwidth > 0;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/23335962/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
index c0d18dd..7363fb9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
@@ -469,4 +469,21 @@ public class TestReplicationAdmin {
admin.removePeer(ID_ONE);
}
+
+ @Test
+ public void testPeerBandwidth() throws ReplicationException {
+ ReplicationPeerConfig rpc = new ReplicationPeerConfig();
+ rpc.setClusterKey(KEY_ONE);
+ admin.addPeer(ID_ONE, rpc);
+ admin.peerAdded(ID_ONE);
+
+ rpc = admin.getPeerConfig(ID_ONE);
+ assertEquals(0, rpc.getBandwidth());
+
+ rpc.setBandwidth(2097152);
+ admin.updatePeerConfig(ID_ONE, rpc);
+
+ assertEquals(2097152, admin.getPeerConfig(ID_ONE).getBandwidth());
+ admin.removePeer(ID_ONE);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/23335962/hbase-shell/src/main/ruby/hbase/replication_admin.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/hbase/replication_admin.rb b/hbase-shell/src/main/ruby/hbase/replication_admin.rb
index 8aa158b..5fd23d3 100644
--- a/hbase-shell/src/main/ruby/hbase/replication_admin.rb
+++ b/hbase-shell/src/main/ruby/hbase/replication_admin.rb
@@ -252,6 +252,15 @@ module Hbase
end
end
+ # Set new bandwidth config for the specified peer
+ def set_peer_bandwidth(id, bandwidth)
+ rpc = get_peer_config(id)
+ unless rpc.nil?
+ rpc.setBandwidth(bandwidth)
+ @replication_admin.updatePeerConfig(id, rpc)
+ end
+ end
+
#----------------------------------------------------------------------------------------------
# Enables a table's replication switch
def enable_tablerep(table_name)
http://git-wip-us.apache.org/repos/asf/hbase/blob/23335962/hbase-shell/src/main/ruby/shell.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell.rb b/hbase-shell/src/main/ruby/shell.rb
index 02f8191..4b111f1 100644
--- a/hbase-shell/src/main/ruby/shell.rb
+++ b/hbase-shell/src/main/ruby/shell.rb
@@ -375,6 +375,7 @@ Shell.load_command_group(
remove_peer_namespaces
show_peer_tableCFs
set_peer_tableCFs
+ set_peer_bandwidth
list_replicated_tables
append_peer_tableCFs
remove_peer_tableCFs
http://git-wip-us.apache.org/repos/asf/hbase/blob/23335962/hbase-shell/src/main/ruby/shell/commands/list_peers.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell/commands/list_peers.rb b/hbase-shell/src/main/ruby/shell/commands/list_peers.rb
index ed6b575..7d53158 100644
--- a/hbase-shell/src/main/ruby/shell/commands/list_peers.rb
+++ b/hbase-shell/src/main/ruby/shell/commands/list_peers.rb
@@ -33,14 +33,15 @@ EOF
peers = replication_admin.list_peers
formatter.header(["PEER_ID", "CLUSTER_KEY", "ENDPOINT_CLASSNAME",
- "STATE", "NAMESPACES", "TABLE_CFS"])
+ "STATE", "NAMESPACES", "TABLE_CFS", "BANDWIDTH"])
peers.entrySet().each do |e|
state = replication_admin.get_peer_state(e.key)
namespaces = replication_admin.show_peer_namespaces(e.value)
tableCFs = replication_admin.show_peer_tableCFs(e.key)
formatter.row([ e.key, e.value.getClusterKey,
- e.value.getReplicationEndpointImpl, state, namespaces, tableCFs ])
+ e.value.getReplicationEndpointImpl, state, namespaces, tableCFs,
+ e.value.getBandwidth ])
end
formatter.footer()
http://git-wip-us.apache.org/repos/asf/hbase/blob/23335962/hbase-shell/src/main/ruby/shell/commands/set_peer_bandwidth.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell/commands/set_peer_bandwidth.rb b/hbase-shell/src/main/ruby/shell/commands/set_peer_bandwidth.rb
new file mode 100644
index 0000000..d9495af
--- /dev/null
+++ b/hbase-shell/src/main/ruby/shell/commands/set_peer_bandwidth.rb
@@ -0,0 +1,42 @@
+#
+# Copyright The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+module Shell
+ module Commands
+ class SetPeerBandwidth< Command
+ def help
+ return <<-EOF
+Set the replication source per node bandwidth for the specified peer.
+Examples:
+
+ # set bandwidth=2MB per regionserver for a peer
+ hbase> set_peer_bandwidth '1', 2097152
+ # unset bandwidth for a peer to use the default bandwidth configured in server-side
+ hbase> set_peer_bandwidth '1'
+
+EOF
+ end
+
+ def command(id, bandwidth = 0)
+ replication_admin.set_peer_bandwidth(id, bandwidth)
+ end
+ end
+ end
+end
http://git-wip-us.apache.org/repos/asf/hbase/blob/23335962/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
index 11ff603..cd1fe35 100644
--- a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
+++ b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
@@ -371,6 +371,24 @@ module Hbase
command(:remove_peer, @peer_id)
end
+ define_test "set_peer_bandwidth: works with peer bandwidth upper limit" do
+ cluster_key = "localhost:2181:/hbase-test"
+ args = { CLUSTER_KEY => cluster_key }
+ command(:add_peer, @peer_id, args)
+ # Normally the ReplicationSourceManager will call ReplicationPeer#peer_added
+ # but here we have to do it ourselves
+ replication_admin.peer_added(@peer_id)
+
+ peer_config = command(:get_peer_config, @peer_id)
+ assert_equal(0, peer_config.get_bandwidth)
+ command(:set_peer_bandwidth, @peer_id, 2097152)
+ peer_config = command(:get_peer_config, @peer_id)
+ assert_equal(2097152, peer_config.get_bandwidth)
+
+ #cleanup
+ command(:remove_peer, @peer_id)
+ end
+
define_test "get_peer_config: works with simple clusterKey peer" do
cluster_key = "localhost:2181:/hbase-test"
args = { CLUSTER_KEY => cluster_key }