You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2022/02/18 18:49:01 UTC
[hbase] branch branch-2.5 updated: HBASE-26576 Allow pluggable queue to be used with the fast path executor or normal balanced executor (#3944)
This is an automated email from the ASF dual-hosted git repository.
apurtell pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.5 by this push:
new 2be2072 HBASE-26576 Allow pluggable queue to be used with the fast path executor or normal balanced executor (#3944)
2be2072 is described below
commit 2be20727040b367b7f009d4651fa78b842f9b2c3
Author: Richard Marscher <rm...@users.noreply.github.com>
AuthorDate: Fri Feb 18 13:00:10 2022 -0500
HBASE-26576 Allow pluggable queue to be used with the fast path executor or normal balanced executor (#3944)
Signed-off-by: Andrew Purtell <ap...@apache.org>
---
.../org/apache/hadoop/hbase/ipc/RpcExecutor.java | 7 ++++
.../hadoop/hbase/ipc/SimpleRpcScheduler.java | 4 +-
.../hadoop/hbase/ipc/TestSimpleRpcScheduler.java | 43 ++++++++++++++++++++--
3 files changed, 49 insertions(+), 5 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
index 1d5a970..a5685ea 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
@@ -82,6 +82,8 @@ public abstract class RpcExecutor {
public static final String PLUGGABLE_CALL_QUEUE_CLASS_NAME =
"hbase.ipc.server.callqueue.pluggable.queue.class.name";
+ public static final String PLUGGABLE_CALL_QUEUE_WITH_FAST_PATH_ENABLED =
+ "hbase.ipc.server.callqueue.pluggable.queue.fast.path.enabled";
private LongAdder numGeneralCallsDropped = new LongAdder();
private LongAdder numLifoModeSwitches = new LongAdder();
@@ -380,6 +382,11 @@ public abstract class RpcExecutor {
return callQueueType.equals(CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE);
}
+ public static boolean isPluggableQueueWithFastPath(String callQueueType, Configuration conf) {
+ return isPluggableQueueType(callQueueType) &&
+ conf.getBoolean(PLUGGABLE_CALL_QUEUE_WITH_FAST_PATH_ENABLED, false);
+ }
+
private Optional<Class<? extends BlockingQueue<CallRunner>>> getPluggableQueueClass() {
String queueClassName = conf.get(PLUGGABLE_CALL_QUEUE_CLASS_NAME);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
index 1b8887a..be0d531 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
@@ -88,7 +88,9 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
callExecutor = new FastPathRWQueueRpcExecutor("default.FPRWQ", Math.max(2, handlerCount),
maxQueueLength, priority, conf, server);
} else {
- if (RpcExecutor.isFifoQueueType(callQueueType) || RpcExecutor.isCodelQueueType(callQueueType)) {
+ if (RpcExecutor.isFifoQueueType(callQueueType) ||
+ RpcExecutor.isCodelQueueType(callQueueType) ||
+ RpcExecutor.isPluggableQueueWithFastPath(callQueueType, conf)) {
callExecutor = new FastPathBalancedQueueRpcExecutor("default.FPBQ", handlerCount,
maxQueueLength, priority, conf, server);
} else {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
index 286094c..3f05e86 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.testclassification.RPCTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdge;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.Before;
import org.junit.ClassRule;
@@ -275,6 +276,41 @@ public class TestSimpleRpcScheduler {
}
@Test
+ public void testPluggableRpcQueueWireUpWithFastPathExecutor() throws Exception {
+ String queueType = RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE;
+ Configuration schedConf = HBaseConfiguration.create();
+ schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType);
+ schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME, "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl");
+ schedConf.setBoolean(RpcExecutor.PLUGGABLE_CALL_QUEUE_WITH_FAST_PATH_ENABLED, true);
+
+ PriorityFunction priority = mock(PriorityFunction.class);
+ when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
+ SimpleRpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 0, 0, 0, priority,
+ HConstants.QOS_THRESHOLD);
+
+ Field f = scheduler.getClass().getDeclaredField("callExecutor");
+ f.setAccessible(true);
+ assertTrue(f.get(scheduler) instanceof FastPathBalancedQueueRpcExecutor);
+ }
+
+ @Test
+ public void testPluggableRpcQueueWireUpWithoutFastPathExecutor() throws Exception {
+ String queueType = RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE;
+ Configuration schedConf = HBaseConfiguration.create();
+ schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType);
+ schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME, "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl");
+
+ PriorityFunction priority = mock(PriorityFunction.class);
+ when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
+ SimpleRpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 0, 0, 0, priority,
+ HConstants.QOS_THRESHOLD);
+
+ Field f = scheduler.getClass().getDeclaredField("callExecutor");
+ f.setAccessible(true);
+ assertTrue(f.get(scheduler) instanceof BalancedQueueRpcExecutor);
+ }
+
+ @Test
public void testPluggableRpcQueueCanListenToConfigurationChanges() throws Exception {
Configuration schedConf = HBaseConfiguration.create();
@@ -316,9 +352,7 @@ public class TestSimpleRpcScheduler {
testRpcScheduler(queueType, null);
}
- private void testRpcScheduler(final String queueType, final String pluggableQueueClass)
- throws Exception {
-
+ private void testRpcScheduler(final String queueType, final String pluggableQueueClass) throws Exception {
Configuration schedConf = HBaseConfiguration.create();
schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType);
@@ -388,7 +422,8 @@ public class TestSimpleRpcScheduler {
// -> WITH REORDER [10 10 10 10 10 10 50 100] -> 530 (Deadline Queue)
if (queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
assertEquals(530, totalTime);
- } else if (queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE)) {
+ } else if (queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE) ||
+ queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE)) {
assertEquals(930, totalTime);
}
} finally {