You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2022/02/18 18:49:01 UTC

[hbase] branch branch-2.5 updated: HBASE-26576 Allow pluggable queue to be used with the fast path executor or normal balanced executor (#3944)

This is an automated email from the ASF dual-hosted git repository.

apurtell pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.5 by this push:
     new 2be2072  HBASE-26576 Allow pluggable queue to be used with the fast path executor or  normal balanced executor (#3944)
2be2072 is described below

commit 2be20727040b367b7f009d4651fa78b842f9b2c3
Author: Richard Marscher <rm...@users.noreply.github.com>
AuthorDate: Fri Feb 18 13:00:10 2022 -0500

    HBASE-26576 Allow pluggable queue to be used with the fast path executor or  normal balanced executor (#3944)
    
    Signed-off-by: Andrew Purtell <ap...@apache.org>
---
 .../org/apache/hadoop/hbase/ipc/RpcExecutor.java   |  7 ++++
 .../hadoop/hbase/ipc/SimpleRpcScheduler.java       |  4 +-
 .../hadoop/hbase/ipc/TestSimpleRpcScheduler.java   | 43 ++++++++++++++++++++--
 3 files changed, 49 insertions(+), 5 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
index 1d5a970..a5685ea 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
@@ -82,6 +82,8 @@ public abstract class RpcExecutor {
 
   public static final String PLUGGABLE_CALL_QUEUE_CLASS_NAME =
     "hbase.ipc.server.callqueue.pluggable.queue.class.name";
+  public static final String PLUGGABLE_CALL_QUEUE_WITH_FAST_PATH_ENABLED =
+    "hbase.ipc.server.callqueue.pluggable.queue.fast.path.enabled";
 
   private LongAdder numGeneralCallsDropped = new LongAdder();
   private LongAdder numLifoModeSwitches = new LongAdder();
@@ -380,6 +382,11 @@ public abstract class RpcExecutor {
     return callQueueType.equals(CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE);
   }
 
+  public static boolean isPluggableQueueWithFastPath(String callQueueType, Configuration conf) {
+    return isPluggableQueueType(callQueueType) &&
+      conf.getBoolean(PLUGGABLE_CALL_QUEUE_WITH_FAST_PATH_ENABLED, false);
+  }
+
   private Optional<Class<? extends BlockingQueue<CallRunner>>> getPluggableQueueClass() {
     String queueClassName = conf.get(PLUGGABLE_CALL_QUEUE_CLASS_NAME);
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
index 1b8887a..be0d531 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
@@ -88,7 +88,9 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
       callExecutor = new FastPathRWQueueRpcExecutor("default.FPRWQ", Math.max(2, handlerCount),
         maxQueueLength, priority, conf, server);
     } else {
-      if (RpcExecutor.isFifoQueueType(callQueueType) || RpcExecutor.isCodelQueueType(callQueueType)) {
+      if (RpcExecutor.isFifoQueueType(callQueueType) ||
+        RpcExecutor.isCodelQueueType(callQueueType) ||
+        RpcExecutor.isPluggableQueueWithFastPath(callQueueType, conf)) {
         callExecutor = new FastPathBalancedQueueRpcExecutor("default.FPBQ", handlerCount,
             maxQueueLength, priority, conf, server);
       } else {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
index 286094c..3f05e86 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.testclassification.RPCTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdge;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.hadoop.hbase.util.Threads;
 import org.junit.Before;
 import org.junit.ClassRule;
@@ -275,6 +276,41 @@ public class TestSimpleRpcScheduler {
   }
 
   @Test
+  public void testPluggableRpcQueueWireUpWithFastPathExecutor() throws Exception {
+    String queueType = RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE;
+    Configuration schedConf = HBaseConfiguration.create();
+    schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType);
+    schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME, "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl");
+    schedConf.setBoolean(RpcExecutor.PLUGGABLE_CALL_QUEUE_WITH_FAST_PATH_ENABLED, true);
+
+    PriorityFunction priority = mock(PriorityFunction.class);
+    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
+    SimpleRpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 0, 0, 0, priority,
+      HConstants.QOS_THRESHOLD);
+
+    Field f = scheduler.getClass().getDeclaredField("callExecutor");
+    f.setAccessible(true);
+    assertTrue(f.get(scheduler) instanceof FastPathBalancedQueueRpcExecutor);
+  }
+
+  @Test
+  public void testPluggableRpcQueueWireUpWithoutFastPathExecutor() throws Exception {
+    String queueType = RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE;
+    Configuration schedConf = HBaseConfiguration.create();
+    schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType);
+    schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME, "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl");
+
+    PriorityFunction priority = mock(PriorityFunction.class);
+    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
+    SimpleRpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 0, 0, 0, priority,
+      HConstants.QOS_THRESHOLD);
+
+    Field f = scheduler.getClass().getDeclaredField("callExecutor");
+    f.setAccessible(true);
+    assertTrue(f.get(scheduler) instanceof BalancedQueueRpcExecutor);
+  }
+
+  @Test
   public void testPluggableRpcQueueCanListenToConfigurationChanges() throws Exception {
 
     Configuration schedConf = HBaseConfiguration.create();
@@ -316,9 +352,7 @@ public class TestSimpleRpcScheduler {
     testRpcScheduler(queueType, null);
   }
 
-  private void testRpcScheduler(final String queueType, final String pluggableQueueClass)
-    throws Exception {
-
+  private void testRpcScheduler(final String queueType, final String pluggableQueueClass) throws Exception {
     Configuration schedConf = HBaseConfiguration.create();
     schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType);
 
@@ -388,7 +422,8 @@ public class TestSimpleRpcScheduler {
       // -> WITH REORDER [10 10 10 10 10 10 50 100] -> 530 (Deadline Queue)
       if (queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
         assertEquals(530, totalTime);
-      } else if (queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE)) {
+      } else if (queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE) ||
+        queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE)) {
         assertEquals(930, totalTime);
       }
     } finally {