You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by an...@apache.org on 2016/02/23 23:21:42 UTC

hbase git commit: HBASE-15306 Make RPC call queue length dynamically configurable

Repository: hbase
Updated Branches:
  refs/heads/master 58283fa1b -> f47dba74d


HBASE-15306 Make RPC call queue length dynamically configurable


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f47dba74
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f47dba74
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f47dba74

Branch: refs/heads/master
Commit: f47dba74d498d5d39f124ad8ea5723c437acbc85
Parents: 58283fa
Author: Mikhail Antonov <an...@apache.org>
Authored: Tue Feb 23 14:20:40 2016 -0800
Committer: Mikhail Antonov <an...@apache.org>
Committed: Tue Feb 23 14:20:40 2016 -0800

----------------------------------------------------------------------
 .../hbase/ipc/BalancedQueueRpcExecutor.java     | 11 +++++-
 .../hadoop/hbase/ipc/RWQueueRpcExecutor.java    | 19 +++++++++-
 .../apache/hadoop/hbase/ipc/RpcExecutor.java    | 11 ++++++
 .../org/apache/hadoop/hbase/ipc/RpcServer.java  |  3 ++
 .../hadoop/hbase/ipc/SimpleRpcScheduler.java    | 18 ++++++++-
 .../hbase/ipc/TestSimpleRpcScheduler.java       | 39 ++++++++++++++++++++
 6 files changed, 97 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/f47dba74/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
index 79b4ec8..e4205eb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
@@ -66,6 +66,10 @@ public class BalancedQueueRpcExecutor extends RpcExecutor {
 
   protected void initializeQueues(final int numQueues,
       final Class<? extends BlockingQueue> queueClass, Object... initargs) {
+    if (initargs.length > 0) {
+      currentQueueLimit = (int) initargs[0];
+      initargs[0] = Math.max((int) initargs[0], DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT);
+    }
     for (int i = 0; i < numQueues; ++i) {
       queues.add((BlockingQueue<CallRunner>) ReflectionUtils.newInstance(queueClass, initargs));
     }
@@ -74,7 +78,12 @@ public class BalancedQueueRpcExecutor extends RpcExecutor {
   @Override
   public boolean dispatch(final CallRunner callTask) throws InterruptedException {
     int queueIndex = balancer.getNextQueue();
-    return queues.get(queueIndex).offer(callTask);
+    BlockingQueue<CallRunner> queue = queues.get(queueIndex);
+    // that means we can overflow by at most <num reader> size (5), that's ok
+    if (queue.size() >= currentQueueLimit) {
+      return false;
+    }
+    return queue.offer(callTask);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/f47dba74/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
index 544370d..a9648b0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
@@ -139,12 +139,22 @@ public class RWQueueRpcExecutor extends RpcExecutor {
               " readQueues=" + numReadQueues + " readHandlers=" + readHandlersCount +
               ((numScanQueues == 0) ? "" : " scanQueues=" + numScanQueues +
                 " scanHandlers=" + scanHandlersCount));
-
+    if (writeQueueInitArgs.length > 0) {
+      currentQueueLimit = (int) writeQueueInitArgs[0];
+      writeQueueInitArgs[0] = Math.max((int) writeQueueInitArgs[0],
+        DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT);
+    }
     for (int i = 0; i < numWriteQueues; ++i) {
+
       queues.add((BlockingQueue<CallRunner>)
         ReflectionUtils.newInstance(writeQueueClass, writeQueueInitArgs));
     }
 
+    if (readQueueInitArgs.length > 0) {
+      currentQueueLimit = (int) readQueueInitArgs[0];
+      readQueueInitArgs[0] = Math.max((int) readQueueInitArgs[0],
+        DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT);
+    }
     for (int i = 0; i < (numReadQueues + numScanQueues); ++i) {
       queues.add((BlockingQueue<CallRunner>)
         ReflectionUtils.newInstance(readQueueClass, readQueueInitArgs));
@@ -170,7 +180,12 @@ public class RWQueueRpcExecutor extends RpcExecutor {
     } else {
       queueIndex = numWriteQueues + readBalancer.getNextQueue();
     }
-    return queues.get(queueIndex).offer(callTask);
+
+    BlockingQueue<CallRunner> queue = queues.get(queueIndex);
+    if (queue.size() >= currentQueueLimit) {
+      return false;
+    }
+    return queue.offer(callTask);
   }
 
   private boolean isWriteRequest(final RequestHeader header, final Message param) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/f47dba74/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
index 017bf39..22cb195 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
@@ -42,6 +42,9 @@ import com.google.common.base.Strings;
 public abstract class RpcExecutor {
   private static final Log LOG = LogFactory.getLog(RpcExecutor.class);
 
+  protected static final int DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT = 250;
+  protected volatile int currentQueueLimit;
+
   private final AtomicInteger activeHandlerCount = new AtomicInteger(0);
   private final List<Thread> handlers;
   private final int handlerCount;
@@ -210,4 +213,12 @@ public abstract class RpcExecutor {
       return ThreadLocalRandom.current().nextInt(queueSize);
     }
   }
+
+  /**
+   * Update current soft limit for executor's call queues
+   * @param conf updated configuration
+   */
+  public void resizeQueues(Configuration conf) {
+    currentQueueLimit = conf.getInt("hbase.ipc.server.max.callqueue.length", currentQueueLimit);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f47dba74/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 58fc598..6ddfb9a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -2099,6 +2099,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
   @Override
   public void onConfigurationChange(Configuration newConf) {
     initReconfigurable(newConf);
+    if (scheduler instanceof ConfigurationObserver) {
+      ((ConfigurationObserver)scheduler).onConfigurationChange(newConf);
+    }
   }
 
   private void initReconfigurable(Configuration confToLoad) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/f47dba74/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
index 8de714d..0003254 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.conf.ConfigurationObserver;
 import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
 
 /**
@@ -36,7 +37,7 @@ import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
  */
 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
 @InterfaceStability.Evolving
-public class SimpleRpcScheduler extends RpcScheduler {
+public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObserver {
   private static final Log LOG = LogFactory.getLog(SimpleRpcScheduler.class);
 
   public static final String CALL_QUEUE_READ_SHARE_CONF_KEY =
@@ -56,6 +57,21 @@ public class SimpleRpcScheduler extends RpcScheduler {
       = "hbase.ipc.server.queue.max.call.delay";
 
   /**
+   * Resize call queues;
+   * @param conf new configuration
+   */
+  @Override
+  public void onConfigurationChange(Configuration conf) {
+    callExecutor.resizeQueues(conf);
+    if (priorityExecutor != null) {
+      priorityExecutor.resizeQueues(conf);
+    }
+    if (replicationExecutor != null) {
+      replicationExecutor.resizeQueues(conf);
+    }
+  }
+
+  /**
    * Comparator used by the "normal callQueue" if DEADLINE_CALL_QUEUE_CONF_KEY is set to true.
    * It uses the calculated "deadline" e.g. to deprioritize long-running job
    *

http://git-wip-us.apache.org/repos/asf/hbase/blob/f47dba74/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
index db992cd..66032e9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
@@ -56,7 +56,9 @@ import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.eq;
@@ -323,4 +325,41 @@ public class TestSimpleRpcScheduler {
       }
     }).when(callTask).run();
   }
+
+  @Test
+  public void testSoftAndHardQueueLimits() throws Exception {
+    Configuration schedConf = HBaseConfiguration.create();
+
+    schedConf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 0);
+    schedConf.setInt("hbase.ipc.server.max.callqueue.length", 5);
+
+    PriorityFunction priority = mock(PriorityFunction.class);
+    when(priority.getPriority(any(RequestHeader.class), any(Message.class),
+      any(User.class))).thenReturn(HConstants.NORMAL_QOS);
+    SimpleRpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 0, 0, 0, priority,
+      HConstants.QOS_THRESHOLD);
+    try {
+      scheduler.start();
+
+      CallRunner putCallTask = mock(CallRunner.class);
+      RpcServer.Call putCall = mock(RpcServer.Call.class);
+      putCall.param = RequestConverter.buildMutateRequest(
+        Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
+      RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
+      when(putCallTask.getCall()).thenReturn(putCall);
+      when(putCall.getHeader()).thenReturn(putHead);
+
+      assertTrue(scheduler.dispatch(putCallTask));
+
+      schedConf.setInt("hbase.ipc.server.max.callqueue.length", 0);
+      scheduler.onConfigurationChange(schedConf);
+      assertFalse(scheduler.dispatch(putCallTask));
+
+      schedConf.setInt("hbase.ipc.server.max.callqueue.length", 1);
+      scheduler.onConfigurationChange(schedConf);
+      assertTrue(scheduler.dispatch(putCallTask));
+    } finally {
+      scheduler.stop();
+    }
+  }
 }