You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2016/03/30 18:03:11 UTC

[09/50] [abbrv] hbase git commit: HBASE-15360 addendum fix testCoDelScheduling

HBASE-15360 addendum fix testCoDelScheduling

Signed-off-by: stack <st...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/23484785
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/23484785
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/23484785

Branch: refs/heads/hbase-12439
Commit: 23484785067bb4a31d5d6a180f578cee70ff9276
Parents: 000117a
Author: zhangduo <zh...@apache.org>
Authored: Tue Mar 22 16:14:20 2016 +0800
Committer: stack <st...@apache.org>
Committed: Tue Mar 22 13:52:26 2016 -0700

----------------------------------------------------------------------
 .../hbase/ipc/AdaptiveLifoCoDelCallQueue.java   |   5 +-
 .../hadoop/hbase/ipc/SimpleRpcScheduler.java    |   2 +-
 .../hbase/ipc/TestSimpleRpcScheduler.java       | 135 +++++++++++++------
 3 files changed, 96 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/23484785/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java
index 37e86be..266c6a2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
 /**
  * Adaptive LIFO blocking queue utilizing CoDel algorithm to prevent queue overloading.
@@ -77,7 +78,7 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> {
   private volatile long minDelay;
 
   // the moment when current interval ends
-  private volatile long intervalTime = System.currentTimeMillis();
+  private volatile long intervalTime = EnvironmentEdgeManager.currentTime();
 
   // switch to ensure only one threads does interval cutoffs
   private AtomicBoolean resetDelay = new AtomicBoolean(true);
@@ -147,7 +148,7 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> {
    *   and internal queue state (deemed overloaded).
    */
   private boolean needToDrop(CallRunner callRunner) {
-    long now = System.currentTimeMillis();
+    long now = EnvironmentEdgeManager.currentTime();
     long callDelay = now - callRunner.getCall().timestamp;
 
     long localMinDelay = this.minDelay;

http://git-wip-us.apache.org/repos/asf/hbase/blob/23484785/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
index 0cd34bb..431aeeb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
@@ -203,7 +203,7 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
       } else if (callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE)) {
         Object[] callQueueInitArgs = {maxQueueLength, codelTargetDelay, codelInterval,
           codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches};
-        callExecutor = new RWQueueRpcExecutor("B.default", handlerCount,
+        callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount,
           numCallQueues, callqReadShare, callqScanShare,
           AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs,
           AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs);

http://git-wip-us.apache.org/repos/asf/hbase/blob/23484785/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
index 6454537..97ef973 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
@@ -17,28 +17,49 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import com.google.protobuf.Message;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CategoryBasedTimeout;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.testclassification.RPCTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.ipc.RpcServer.Call;
+import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.RPCTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdge;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Threads;
 import org.junit.Before;
 import org.junit.Rule;
@@ -48,25 +69,11 @@ import org.junit.rules.TestRule;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.google.protobuf.Message;
 
 @Category({RPCTests.class, SmallTests.class})
 public class TestSimpleRpcScheduler {
@@ -218,7 +225,7 @@ public class TestSimpleRpcScheduler {
       scheduler.dispatch(smallCallTask);
 
       while (work.size() < 8) {
-        Threads.sleepWithoutInterrupt(100);
+        Thread.sleep(100);
       }
 
       int seqSum = 0;
@@ -298,7 +305,7 @@ public class TestSimpleRpcScheduler {
       scheduler.dispatch(scanCallTask);
 
       while (work.size() < 6) {
-        Threads.sleepWithoutInterrupt(100);
+        Thread.sleep(100);
       }
 
       for (int i = 0; i < work.size() - 2; i += 3) {
@@ -326,6 +333,13 @@ public class TestSimpleRpcScheduler {
     }).when(callTask).run();
   }
 
+  private static void waitUntilQueueEmpty(SimpleRpcScheduler scheduler)
+      throws InterruptedException {
+    while (scheduler.getGeneralQueueLength() > 0) {
+      Thread.sleep(100);
+    }
+  }
+
   @Test
   public void testSoftAndHardQueueLimits() throws Exception {
     Configuration schedConf = HBaseConfiguration.create();
@@ -354,9 +368,7 @@ public class TestSimpleRpcScheduler {
       schedConf.setInt("hbase.ipc.server.max.callqueue.length", 0);
       scheduler.onConfigurationChange(schedConf);
       assertFalse(scheduler.dispatch(putCallTask));
-      while (scheduler.getGeneralQueueLength() > 0) {
-        Threads.sleepWithoutInterrupt(100);
-      }
+      waitUntilQueueEmpty(scheduler);
       schedConf.setInt("hbase.ipc.server.max.callqueue.length", 1);
       scheduler.onConfigurationChange(schedConf);
       assertTrue(scheduler.dispatch(putCallTask));
@@ -365,8 +377,30 @@ public class TestSimpleRpcScheduler {
     }
   }
 
+  private static final class CoDelEnvironmentEdge implements EnvironmentEdge {
+
+    private final BlockingQueue<Long> timeQ = new LinkedBlockingQueue<>();
+
+    private long offset;
+
+    private final Set<String> threadNamePrefixs = new HashSet<>();
+
+    @Override
+    public long currentTime() {
+      for (String threadNamePrefix : threadNamePrefixs) {
+        if (Thread.currentThread().getName().startsWith(threadNamePrefix)) {
+          return timeQ.poll().longValue() + offset;
+        }
+      }
+      return System.currentTimeMillis();
+    }
+  }
+
   @Test
   public void testCoDelScheduling() throws Exception {
+    CoDelEnvironmentEdge envEdge = new CoDelEnvironmentEdge();
+    envEdge.threadNamePrefixs.add("RW.default");
+    envEdge.threadNamePrefixs.add("B.default");
     Configuration schedConf = HBaseConfiguration.create();
 
     schedConf.set(SimpleRpcScheduler.CALL_QUEUE_TYPE_CONF_KEY,
@@ -379,36 +413,51 @@ public class TestSimpleRpcScheduler {
       HConstants.QOS_THRESHOLD);
     try {
       scheduler.start();
-
+      EnvironmentEdgeManager.injectEdge(envEdge);
+      envEdge.offset = 5;
       // calls faster than min delay
       for (int i = 0; i < 100; i++) {
-        CallRunner cr = getMockedCallRunner();
+        long time = System.currentTimeMillis();
+        envEdge.timeQ.put(time);
+        CallRunner cr = getMockedCallRunner(time);
         Thread.sleep(5);
         scheduler.dispatch(cr);
       }
-      Thread.sleep(100); // make sure fast calls are handled
+      // make sure fast calls are handled
+      waitUntilQueueEmpty(scheduler);
+      Thread.sleep(100);
       assertEquals("None of these calls should have been discarded", 0,
         scheduler.getNumGeneralCallsDropped());
 
+      envEdge.offset = 6;
       // calls slower than min delay, but not individually slow enough to be dropped
       for (int i = 0; i < 20; i++) {
-        CallRunner cr = getMockedCallRunner();
+        long time = System.currentTimeMillis();
+        envEdge.timeQ.put(time);
+        CallRunner cr = getMockedCallRunner(time);
         Thread.sleep(6);
         scheduler.dispatch(cr);
       }
 
-      Thread.sleep(100); // make sure somewhat slow calls are handled
+      // make sure somewhat slow calls are handled
+      waitUntilQueueEmpty(scheduler);
+      Thread.sleep(100);
       assertEquals("None of these calls should have been discarded", 0,
         scheduler.getNumGeneralCallsDropped());
 
+      envEdge.offset = 12;
       // now slow calls and the ones to be dropped
       for (int i = 0; i < 20; i++) {
-        CallRunner cr = getMockedCallRunner();
+        long time = System.currentTimeMillis();
+        envEdge.timeQ.put(time);
+        CallRunner cr = getMockedCallRunner(time);
         Thread.sleep(12);
         scheduler.dispatch(cr);
       }
 
-      Thread.sleep(100); // make sure somewhat slow calls are handled
+      // make sure somewhat slow calls are handled
+      waitUntilQueueEmpty(scheduler);
+      Thread.sleep(100);
       assertTrue("There should have been at least 12 calls dropped",
         scheduler.getNumGeneralCallsDropped() > 12);
     } finally {
@@ -416,7 +465,7 @@ public class TestSimpleRpcScheduler {
     }
   }
 
-  private CallRunner getMockedCallRunner() throws IOException {
+  private CallRunner getMockedCallRunner(long timestamp) throws IOException {
     CallRunner putCallTask = mock(CallRunner.class);
     RpcServer.Call putCall = mock(RpcServer.Call.class);
     putCall.param = RequestConverter.buildMutateRequest(
@@ -424,7 +473,7 @@ public class TestSimpleRpcScheduler {
     RPCProtos.RequestHeader putHead = RPCProtos.RequestHeader.newBuilder().setMethodName("mutate").build();
     when(putCallTask.getCall()).thenReturn(putCall);
     when(putCall.getHeader()).thenReturn(putHead);
-    putCall.timestamp = System.currentTimeMillis();
+    putCall.timestamp = timestamp;
     return putCallTask;
   }
 }