You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jd...@apache.org on 2014/02/11 01:48:26 UTC

svn commit: r1566922 - in /hbase/branches/0.98/hbase-server/src: main/java/org/apache/hadoop/hbase/replication/regionserver/ test/java/org/apache/hadoop/hbase/replication/regionserver/

Author: jdcryans
Date: Tue Feb 11 00:48:26 2014
New Revision: 1566922

URL: http://svn.apache.org/r1566922
Log:
HBASE-9501 Provide throttling for replication (Feng Honghua via JD)

Added:
    hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationThrottler.java
    hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationThrottler.java
Modified:
    hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java

Modified: hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1566922&r1=1566921&r2=1566922&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Tue Feb 11 00:48:26 2014
@@ -131,6 +131,8 @@ public class ReplicationSource extends T
   private ReplicationSinkManager replicationSinkMgr;
   //WARN threshold for the number of queued logs, defaults to 2
   private int logQueueWarnThreshold;
+  // throttler
+  private ReplicationThrottler throttler;
 
   /**
    * Instantiation method used by region servers
@@ -164,6 +166,8 @@ public class ReplicationSource extends T
     // replication and make replication specific settings such as compression or codec to use
     // passing Cells.
     this.conn = HConnectionManager.getConnection(this.conf);
+    long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
+    this.throttler = new ReplicationThrottler((double)bandwidth/10.0);
     this.replicationQueues = replicationQueues;
     this.replicationPeers = replicationPeers;
     this.manager = manager;
@@ -598,6 +602,7 @@ public class ReplicationSource extends T
       Thread.sleep(this.sleepForRetries * sleepMultiplier);
     } catch (InterruptedException e) {
       LOG.debug("Interrupted while sleeping between retries");
+      Thread.currentThread().interrupt();
     }
     return sleepMultiplier < maxRetriesMultiplier;
   }
@@ -661,6 +666,22 @@ public class ReplicationSource extends T
       }
       SinkPeer sinkPeer = null;
       try {
+        if (this.throttler.isEnabled()) {
+          long sleepTicks = this.throttler.getNextSleepInterval(currentSize);
+          if (sleepTicks > 0) {
+            try {
+              if (LOG.isTraceEnabled()) {
+                LOG.trace("To sleep " + sleepTicks + "ms for throttling control");
+              }
+              Thread.sleep(sleepTicks);
+            } catch (InterruptedException e) {
+              LOG.debug("Interrupted while sleeping for throttling control");
+              Thread.currentThread().interrupt();
+            }
+            // reset throttler's cycle start tick when sleep for throttling occurs
+            this.throttler.resetStartTick();
+          }
+        }
         sinkPeer = replicationSinkMgr.getReplicationSink();
         BlockingInterface rrs = sinkPeer.getRegionServer();
         if (LOG.isTraceEnabled()) {
@@ -675,6 +696,9 @@ public class ReplicationSource extends T
               this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
           this.lastLoggedPosition = this.repLogReader.getPosition();
         }
+        if (this.throttler.isEnabled()) {
+          this.throttler.addPushSize(currentSize);
+        }
         this.totalReplicatedEdits += entries.size();
         this.totalReplicatedOperations += currentNbOperations;
         this.metrics.shipBatch(this.currentNbOperations);

Added: hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationThrottler.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationThrottler.java?rev=1566922&view=auto
==============================================================================
--- hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationThrottler.java (added)
+++ hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationThrottler.java Tue Feb 11 00:48:26 2014
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+/**
+ * Per-peer per-node throttling controller for replication: enabled if
+ * bandwidth > 0, a cycle = 100ms, by throttling we guarantee data pushed
+ * to peer within each cycle won't exceed 'bandwidth' bytes
+ */
+@InterfaceAudience.Private
+public class ReplicationThrottler {
+  private final boolean enabled;
+  private final double bandwidth;
+  private long cyclePushSize;
+  private long cycleStartTick;
+
+  /**
+   * ReplicationThrottler constructor
+   * If bandwidth less than 1, throttling is disabled
+   * @param bandwidth per cycle(100ms)
+   */
+  public ReplicationThrottler(final double bandwidth) {
+    this.bandwidth = bandwidth;
+    this.enabled = this.bandwidth > 0;
+    if (this.enabled) {
+      this.cyclePushSize = 0;
+      this.cycleStartTick = EnvironmentEdgeManager.currentTimeMillis();
+    }
+  }
+
+  /**
+   * If throttling is enabled
+   * @return true if throttling is enabled
+   */
+  public boolean isEnabled() {
+    return this.enabled;
+  }
+
+  /**
+   * Get how long the caller should sleep according to the current size and
+   * current cycle's total push size and start tick, return the sleep interval
+   * for throttling control.
+   * @param size is the size of edits to be pushed
+   * @return sleep interval for throttling control
+   */
+  public long getNextSleepInterval(final int size) {
+    if (!this.enabled) {
+      return 0;
+    }
+
+    long sleepTicks = 0;
+    long now = EnvironmentEdgeManager.currentTimeMillis();
+    // 1. if cyclePushSize exceeds bandwidth, we need to sleep some
+    //    following cycles to amortize, this case can occur when a single push
+    //    exceeds the bandwidth
+    if ((double)this.cyclePushSize > bandwidth) {
+      double cycles = Math.ceil((double)this.cyclePushSize / bandwidth);
+      long shouldTillTo = this.cycleStartTick + (long)(cycles * 100);
+      if (shouldTillTo > now) {
+        sleepTicks = shouldTillTo - now;
+      } else {
+        // no reset in shipEdits since no sleep, so we need to reset cycleStartTick here!
+        this.cycleStartTick = now;
+      }
+      this.cyclePushSize = 0;
+    } else {
+      long nextCycleTick = this.cycleStartTick + 100;  //a cycle is 100ms
+      if (now >= nextCycleTick) {
+        // 2. switch to next cycle if the current cycle has passed
+        this.cycleStartTick = now;
+        this.cyclePushSize = 0;
+      } else if (this.cyclePushSize > 0 &&
+          (double)(this.cyclePushSize + size) >= bandwidth) {
+        // 3. delay the push to next cycle if exceeds throttling bandwidth.
+        //    enforcing cyclePushSize > 0 to avoid the unnecessary sleep for case
+        //    where a cycle's first push size(currentSize) > bandwidth
+        sleepTicks = nextCycleTick - now;
+        this.cyclePushSize = 0;
+      }
+    }
+    return sleepTicks;
+  }
+
+  /**
+   * Add current size to the current cycle's total push size
+   * @param size is the current size added to the current cycle's
+   * total push size
+   */
+  public void addPushSize(final int size) {
+    if (this.enabled) {
+      this.cyclePushSize += size;
+    }
+  }
+
+  /**
+   * Reset the cycle start tick to NOW
+   */
+  public void resetStartTick() {
+    if (this.enabled) {
+      this.cycleStartTick = EnvironmentEdgeManager.currentTimeMillis();
+    }
+  }
+}

Added: hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationThrottler.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationThrottler.java?rev=1566922&view=auto
==============================================================================
--- hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationThrottler.java (added)
+++ hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationThrottler.java Tue Feb 11 00:48:26 2014
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.SmallTests;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestReplicationThrottler {
+
+  private static final Log LOG = LogFactory.getLog(TestReplicationThrottler.class);
+
+  /**
+   * unit test for throttling
+   */
+  @Test(timeout=10000)
+  public void testThrottling() {
+    LOG.info("testThrottling");
+
+    // throttle bandwidth is 100 and 10 bytes/cycle respectively
+    ReplicationThrottler throttler1 = new ReplicationThrottler(100);
+    ReplicationThrottler throttler2 = new ReplicationThrottler(10);
+
+    long ticks1 = throttler1.getNextSleepInterval(1000);
+    long ticks2 = throttler2.getNextSleepInterval(1000);
+
+    // 1. the first push size is 1000, though 1000 bytes exceeds 100/10
+    //    bandwidthes, but no sleep since it's the first push of current
+    //    cycle, amortizing occurs when next push arrives
+    assertEquals(0, ticks1);
+    assertEquals(0, ticks2);
+
+    throttler1.addPushSize(1000);
+    throttler2.addPushSize(1000);
+
+    ticks1 = throttler1.getNextSleepInterval(5);
+    ticks2 = throttler2.getNextSleepInterval(5);
+
+    // 2. when the second push(5) arrives and throttling(5) is called, the
+    //    current cyclePushSize is 1000 bytes, this should make throttler1
+    //    sleep 1000/100 = 10 cycles = 1s and make throttler2 sleep 1000/10
+    //    = 100 cycles = 10s before the second push occurs -- amortize case
+    //    after amortizing, both cycleStartTick and cyclePushSize are reset
+    assertTrue(ticks1 == 1000 || ticks1 == 999);
+    assertTrue(ticks2 == 10000 || ticks2 == 9999);
+
+    throttler1.resetStartTick();
+    throttler2.resetStartTick();
+
+    throttler1.addPushSize(5);
+    throttler2.addPushSize(5);
+
+    ticks1 = throttler1.getNextSleepInterval(45);
+    ticks2 = throttler2.getNextSleepInterval(45);
+
+    // 3. when the third push(45) arrives and throttling(45) is called, the
+    //    current cyclePushSize is 5 bytes, 50-byte makes throttler1 no
+    //    sleep, but can make throttler2 delay to next cycle
+    // note: in real case, sleep time should cover time elapses during push
+    //       operation
+    assertTrue(ticks1 == 0);
+    assertTrue(ticks2 == 100 || ticks2 == 99);
+
+    throttler2.resetStartTick();
+
+    throttler1.addPushSize(45);
+    throttler2.addPushSize(45);
+
+    ticks1 = throttler1.getNextSleepInterval(60);
+    ticks2 = throttler2.getNextSleepInterval(60);
+
+    // 4. when the fourth push(60) arrives and throttling(60) is called, throttler1
+    //    delay to next cycle since 45+60 == 105; and throttler2 should firstly sleep
+    //    ceiling(45/10)= 5 cycles = 500ms to amortize previous push
+    // note: in real case, sleep time should cover time elapses during push
+    //       operation
+    assertTrue(ticks1 == 100 || ticks1 == 99);
+    assertTrue(ticks2 == 500 || ticks2 == 499);
+  }
+}