You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jd...@apache.org on 2014/02/11 01:48:26 UTC
svn commit: r1566922 - in /hbase/branches/0.98/hbase-server/src:
main/java/org/apache/hadoop/hbase/replication/regionserver/
test/java/org/apache/hadoop/hbase/replication/regionserver/
Author: jdcryans
Date: Tue Feb 11 00:48:26 2014
New Revision: 1566922
URL: http://svn.apache.org/r1566922
Log:
HBASE-9501 Provide throttling for replication (Feng Honghua via JD)
Added:
hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationThrottler.java
hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationThrottler.java
Modified:
hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
Modified: hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1566922&r1=1566921&r2=1566922&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Tue Feb 11 00:48:26 2014
@@ -131,6 +131,8 @@ public class ReplicationSource extends T
private ReplicationSinkManager replicationSinkMgr;
//WARN threshold for the number of queued logs, defaults to 2
private int logQueueWarnThreshold;
+ // throttler
+ private ReplicationThrottler throttler;
/**
* Instantiation method used by region servers
@@ -164,6 +166,8 @@ public class ReplicationSource extends T
// replication and make replication specific settings such as compression or codec to use
// passing Cells.
this.conn = HConnectionManager.getConnection(this.conf);
+ long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
+ this.throttler = new ReplicationThrottler((double)bandwidth/10.0);
this.replicationQueues = replicationQueues;
this.replicationPeers = replicationPeers;
this.manager = manager;
@@ -598,6 +602,7 @@ public class ReplicationSource extends T
Thread.sleep(this.sleepForRetries * sleepMultiplier);
} catch (InterruptedException e) {
LOG.debug("Interrupted while sleeping between retries");
+ Thread.currentThread().interrupt();
}
return sleepMultiplier < maxRetriesMultiplier;
}
@@ -661,6 +666,22 @@ public class ReplicationSource extends T
}
SinkPeer sinkPeer = null;
try {
+ if (this.throttler.isEnabled()) {
+ long sleepTicks = this.throttler.getNextSleepInterval(currentSize);
+ if (sleepTicks > 0) {
+ try {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("To sleep " + sleepTicks + "ms for throttling control");
+ }
+ Thread.sleep(sleepTicks);
+ } catch (InterruptedException e) {
+ LOG.debug("Interrupted while sleeping for throttling control");
+ Thread.currentThread().interrupt();
+ }
+ // reset throttler's cycle start tick when sleep for throttling occurs
+ this.throttler.resetStartTick();
+ }
+ }
sinkPeer = replicationSinkMgr.getReplicationSink();
BlockingInterface rrs = sinkPeer.getRegionServer();
if (LOG.isTraceEnabled()) {
@@ -675,6 +696,9 @@ public class ReplicationSource extends T
this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
this.lastLoggedPosition = this.repLogReader.getPosition();
}
+ if (this.throttler.isEnabled()) {
+ this.throttler.addPushSize(currentSize);
+ }
this.totalReplicatedEdits += entries.size();
this.totalReplicatedOperations += currentNbOperations;
this.metrics.shipBatch(this.currentNbOperations);
Added: hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationThrottler.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationThrottler.java?rev=1566922&view=auto
==============================================================================
--- hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationThrottler.java (added)
+++ hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationThrottler.java Tue Feb 11 00:48:26 2014
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+/**
+ * Per-peer per-node throttling controller for replication: enabled if
+ * bandwidth > 0, a cycle = 100ms, by throttling we guarantee data pushed
+ * to peer within each cycle won't exceed 'bandwidth' bytes
+ */
+@InterfaceAudience.Private
+public class ReplicationThrottler {
+ private final boolean enabled;
+ private final double bandwidth;
+ private long cyclePushSize;
+ private long cycleStartTick;
+
+ /**
+ * ReplicationThrottler constructor
+ * If bandwidth less than 1, throttling is disabled
+ * @param bandwidth per cycle(100ms)
+ */
+ public ReplicationThrottler(final double bandwidth) {
+ this.bandwidth = bandwidth;
+ this.enabled = this.bandwidth > 0;
+ if (this.enabled) {
+ this.cyclePushSize = 0;
+ this.cycleStartTick = EnvironmentEdgeManager.currentTimeMillis();
+ }
+ }
+
+ /**
+ * If throttling is enabled
+ * @return true if throttling is enabled
+ */
+ public boolean isEnabled() {
+ return this.enabled;
+ }
+
+ /**
+ * Get how long the caller should sleep according to the current size and
+ * current cycle's total push size and start tick, return the sleep interval
+ * for throttling control.
+ * @param size is the size of edits to be pushed
+ * @return sleep interval for throttling control
+ */
+ public long getNextSleepInterval(final int size) {
+ if (!this.enabled) {
+ return 0;
+ }
+
+ long sleepTicks = 0;
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+ // 1. if cyclePushSize exceeds bandwidth, we need to sleep some
+ // following cycles to amortize, this case can occur when a single push
+ // exceeds the bandwidth
+ if ((double)this.cyclePushSize > bandwidth) {
+ double cycles = Math.ceil((double)this.cyclePushSize / bandwidth);
+ long shouldTillTo = this.cycleStartTick + (long)(cycles * 100);
+ if (shouldTillTo > now) {
+ sleepTicks = shouldTillTo - now;
+ } else {
+ // no reset in shipEdits since no sleep, so we need to reset cycleStartTick here!
+ this.cycleStartTick = now;
+ }
+ this.cyclePushSize = 0;
+ } else {
+ long nextCycleTick = this.cycleStartTick + 100; //a cycle is 100ms
+ if (now >= nextCycleTick) {
+ // 2. switch to next cycle if the current cycle has passed
+ this.cycleStartTick = now;
+ this.cyclePushSize = 0;
+ } else if (this.cyclePushSize > 0 &&
+ (double)(this.cyclePushSize + size) >= bandwidth) {
+ // 3. delay the push to next cycle if exceeds throttling bandwidth.
+ // enforcing cyclePushSize > 0 to avoid the unnecessary sleep for case
+ // where a cycle's first push size(currentSize) > bandwidth
+ sleepTicks = nextCycleTick - now;
+ this.cyclePushSize = 0;
+ }
+ }
+ return sleepTicks;
+ }
+
+ /**
+ * Add current size to the current cycle's total push size
+ * @param size is the current size added to the current cycle's
+ * total push size
+ */
+ public void addPushSize(final int size) {
+ if (this.enabled) {
+ this.cyclePushSize += size;
+ }
+ }
+
+ /**
+ * Reset the cycle start tick to NOW
+ */
+ public void resetStartTick() {
+ if (this.enabled) {
+ this.cycleStartTick = EnvironmentEdgeManager.currentTimeMillis();
+ }
+ }
+}
Added: hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationThrottler.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationThrottler.java?rev=1566922&view=auto
==============================================================================
--- hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationThrottler.java (added)
+++ hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationThrottler.java Tue Feb 11 00:48:26 2014
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.SmallTests;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestReplicationThrottler {
+
+ private static final Log LOG = LogFactory.getLog(TestReplicationThrottler.class);
+
+ /**
+ * unit test for throttling
+ */
+ @Test(timeout=10000)
+ public void testThrottling() {
+ LOG.info("testThrottling");
+
+ // throttle bandwidth is 100 and 10 bytes/cycle respectively
+ ReplicationThrottler throttler1 = new ReplicationThrottler(100);
+ ReplicationThrottler throttler2 = new ReplicationThrottler(10);
+
+ long ticks1 = throttler1.getNextSleepInterval(1000);
+ long ticks2 = throttler2.getNextSleepInterval(1000);
+
+ // 1. the first push size is 1000, though 1000 bytes exceeds 100/10
+ // bandwidthes, but no sleep since it's the first push of current
+ // cycle, amortizing occurs when next push arrives
+ assertEquals(0, ticks1);
+ assertEquals(0, ticks2);
+
+ throttler1.addPushSize(1000);
+ throttler2.addPushSize(1000);
+
+ ticks1 = throttler1.getNextSleepInterval(5);
+ ticks2 = throttler2.getNextSleepInterval(5);
+
+ // 2. when the second push(5) arrives and throttling(5) is called, the
+ // current cyclePushSize is 1000 bytes, this should make throttler1
+ // sleep 1000/100 = 10 cycles = 1s and make throttler2 sleep 1000/10
+ // = 100 cycles = 10s before the second push occurs -- amortize case
+ // after amortizing, both cycleStartTick and cyclePushSize are reset
+ assertTrue(ticks1 == 1000 || ticks1 == 999);
+ assertTrue(ticks2 == 10000 || ticks2 == 9999);
+
+ throttler1.resetStartTick();
+ throttler2.resetStartTick();
+
+ throttler1.addPushSize(5);
+ throttler2.addPushSize(5);
+
+ ticks1 = throttler1.getNextSleepInterval(45);
+ ticks2 = throttler2.getNextSleepInterval(45);
+
+ // 3. when the third push(45) arrives and throttling(45) is called, the
+ // current cyclePushSize is 5 bytes, 50-byte makes throttler1 no
+ // sleep, but can make throttler2 delay to next cycle
+ // note: in real case, sleep time should cover time elapses during push
+ // operation
+ assertTrue(ticks1 == 0);
+ assertTrue(ticks2 == 100 || ticks2 == 99);
+
+ throttler2.resetStartTick();
+
+ throttler1.addPushSize(45);
+ throttler2.addPushSize(45);
+
+ ticks1 = throttler1.getNextSleepInterval(60);
+ ticks2 = throttler2.getNextSleepInterval(60);
+
+ // 4. when the fourth push(60) arrives and throttling(60) is called, throttler1
+ // delay to next cycle since 45+60 == 105; and throttler2 should firstly sleep
+ // ceiling(45/10)= 5 cycles = 500ms to amortize previous push
+ // note: in real case, sleep time should cover time elapses during push
+ // operation
+ assertTrue(ticks1 == 100 || ticks1 == 99);
+ assertTrue(ticks2 == 500 || ticks2 == 499);
+ }
+}