You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by re...@apache.org on 2018/08/08 02:39:31 UTC
hbase git commit: HBASE-20965 Separate region server report requests
to new handlers
Repository: hbase
Updated Branches:
refs/heads/master b9413839a -> 48d387413
HBASE-20965 Separate region server report requests to new handlers
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/48d38741
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/48d38741
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/48d38741
Branch: refs/heads/master
Commit: 48d387413f012cd6bfecc42085f7432647975780
Parents: b941383
Author: meiyi <my...@gamil.com>
Authored: Tue Aug 7 14:29:42 2018 +0800
Committer: Reid Chan <re...@apache.org>
Committed: Wed Aug 8 10:39:11 2018 +0800
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/HConstants.java | 2 +
.../hadoop/hbase/ipc/FifoRpcScheduler.java | 36 ++--
.../hbase/ipc/MasterFifoRpcScheduler.java | 108 ++++++++++++
.../hadoop/hbase/master/MasterRpcServices.java | 10 ++
.../MasterFifoRpcSchedulerFactory.java | 46 +++++
.../hbase/regionserver/RSRpcServices.java | 20 ++-
.../hbase/ipc/TestMasterFifoRpcScheduler.java | 168 +++++++++++++++++++
7 files changed, 376 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/48d38741/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index beb65fa..436426f 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1022,6 +1022,8 @@ public final class HConstants {
public static final String REGION_SERVER_HANDLER_COUNT = "hbase.regionserver.handler.count";
public static final int DEFAULT_REGION_SERVER_HANDLER_COUNT = 30;
+ public static final String REGION_SERVER_REPORT_HANDLER_COUNT =
+ "hbase.regionserver.report.handler.count";
/*
* REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT:
http://git-wip-us.apache.org/repos/asf/hbase/blob/48d38741/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
index bd8bdce..a3fa684 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -39,10 +40,10 @@ import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil;
@InterfaceAudience.Private
public class FifoRpcScheduler extends RpcScheduler {
private static final Logger LOG = LoggerFactory.getLogger(FifoRpcScheduler.class);
- private final int handlerCount;
- private final int maxQueueLength;
- private final AtomicInteger queueSize = new AtomicInteger(0);
- private ThreadPoolExecutor executor;
+ protected final int handlerCount;
+ protected final int maxQueueLength;
+ protected final AtomicInteger queueSize = new AtomicInteger(0);
+ protected ThreadPoolExecutor executor;
public FifoRpcScheduler(Configuration conf, int handlerCount) {
this.handlerCount = handlerCount;
@@ -94,6 +95,11 @@ public class FifoRpcScheduler extends RpcScheduler {
@Override
public boolean dispatch(final CallRunner task) throws IOException, InterruptedException {
+ return executeRpcCall(executor, queueSize, task);
+ }
+
+ protected boolean executeRpcCall(final ThreadPoolExecutor executor, final AtomicInteger queueSize,
+ final CallRunner task) {
// Executors provide no offer, so make our own.
int queued = queueSize.getAndIncrement();
if (maxQueueLength > 0 && queued >= maxQueueLength) {
@@ -199,15 +205,19 @@ public class FifoRpcScheduler extends RpcScheduler {
callQueueInfo.setCallMethodCount(queueName, methodCount);
callQueueInfo.setCallMethodSize(queueName, methodSize);
+ updateMethodCountAndSizeByQueue(executor.getQueue(), methodCount, methodSize);
+
+ return callQueueInfo;
+ }
- for (Runnable r:executor.getQueue()) {
+ protected void updateMethodCountAndSizeByQueue(BlockingQueue<Runnable> queue,
+ HashMap<String, Long> methodCount, HashMap<String, Long> methodSize) {
+ for (Runnable r : queue) {
FifoCallRunner mcr = (FifoCallRunner) r;
RpcCall rpcCall = mcr.getCallRunner().getRpcCall();
- String method;
-
- if (null==rpcCall.getMethod() ||
- StringUtil.isNullOrEmpty(method = rpcCall.getMethod().getName())) {
+ String method = getCallMethod(mcr.getCallRunner());
+ if (StringUtil.isNullOrEmpty(method)) {
method = "Unknown";
}
@@ -216,7 +226,13 @@ public class FifoRpcScheduler extends RpcScheduler {
methodCount.put(method, 1 + methodCount.getOrDefault(method, 0L));
methodSize.put(method, size + methodSize.getOrDefault(method, 0L));
}
+ }
- return callQueueInfo;
+ protected String getCallMethod(final CallRunner task) {
+ RpcCall call = task.getRpcCall();
+ if (call != null && call.getMethod() != null) {
+ return call.getMethod().getName();
+ }
+ return null;
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/48d38741/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MasterFifoRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MasterFifoRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MasterFifoRpcScheduler.java
new file mode 100644
index 0000000..01a8dcf
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MasterFifoRpcScheduler.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DaemonThreadFactory;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Special rpc scheduler only used for master.
+ */
+@InterfaceAudience.Private
+public class MasterFifoRpcScheduler extends FifoRpcScheduler {
+ private static final Logger LOG = LoggerFactory.getLogger(MasterFifoRpcScheduler.class);
+
+ private static final String REGION_SERVER_REPORT = "RegionServerReport";
+ private final int rsReportHandlerCount;
+ private final int rsRsreportMaxQueueLength;
+ private final AtomicInteger rsReportQueueSize = new AtomicInteger(0);
+ private ThreadPoolExecutor rsReportExecutor;
+
+ public MasterFifoRpcScheduler(Configuration conf, int callHandlerCount,
+ int rsReportHandlerCount) {
+ super(conf, callHandlerCount);
+ this.rsReportHandlerCount = rsReportHandlerCount;
+ this.rsRsreportMaxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH,
+ rsReportHandlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
+ }
+
+ @Override
+ public void start() {
+ this.executor = new ThreadPoolExecutor(handlerCount, handlerCount, 60, TimeUnit.SECONDS,
+ new ArrayBlockingQueue<Runnable>(maxQueueLength),
+ new DaemonThreadFactory("MasterFifoRpcScheduler.call.handler"),
+ new ThreadPoolExecutor.CallerRunsPolicy());
+ this.rsReportExecutor = new ThreadPoolExecutor(rsReportHandlerCount, rsReportHandlerCount, 60,
+ TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(rsRsreportMaxQueueLength),
+ new DaemonThreadFactory("MasterFifoRpcScheduler.RSReport.handler"),
+ new ThreadPoolExecutor.CallerRunsPolicy());
+ }
+
+ @Override
+ public void stop() {
+ this.executor.shutdown();
+ this.rsReportExecutor.shutdown();
+ }
+
+ @Override
+ public boolean dispatch(final CallRunner task) throws IOException, InterruptedException {
+ String method = getCallMethod(task);
+ if (rsReportExecutor != null && method != null && method.equals(REGION_SERVER_REPORT)) {
+ return executeRpcCall(rsReportExecutor, rsReportQueueSize, task);
+ } else {
+ return executeRpcCall(executor, queueSize, task);
+ }
+ }
+
+ @Override
+ public int getGeneralQueueLength() {
+ return executor.getQueue().size() + rsReportExecutor.getQueue().size();
+ }
+
+ @Override
+ public int getActiveRpcHandlerCount() {
+ return executor.getActiveCount() + rsReportExecutor.getActiveCount();
+ }
+
+ @Override
+ public CallQueueInfo getCallQueueInfo() {
+ String queueName = "Master Fifo Queue";
+
+ HashMap<String, Long> methodCount = new HashMap<>();
+ HashMap<String, Long> methodSize = new HashMap<>();
+
+ CallQueueInfo callQueueInfo = new CallQueueInfo();
+ callQueueInfo.setCallMethodCount(queueName, methodCount);
+ callQueueInfo.setCallMethodSize(queueName, methodSize);
+
+ updateMethodCountAndSizeByQueue(executor.getQueue(), methodCount, methodSize);
+ updateMethodCountAndSizeByQueue(rsReportExecutor.getQueue(), methodCount, methodSize);
+
+ return callQueueInfo;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/48d38741/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index a4d9ff8..9ebbd3c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -336,6 +336,16 @@ public class MasterRpcServices extends RSRpcServices
}
@Override
+ protected Class<?> getRpcSchedulerFactoryClass() {
+ Configuration conf = getConfiguration();
+ if (conf != null) {
+ return conf.getClass(MASTER_RPC_SCHEDULER_FACTORY_CLASS, super.getRpcSchedulerFactoryClass());
+ } else {
+ return super.getRpcSchedulerFactoryClass();
+ }
+ }
+
+ @Override
protected RpcServerInterface createRpcServer(Server server, Configuration conf,
RpcSchedulerFactory rpcSchedulerFactory, InetSocketAddress bindAddress, String name)
throws IOException {
http://git-wip-us.apache.org/repos/asf/hbase/blob/48d38741/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MasterFifoRpcSchedulerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MasterFifoRpcSchedulerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MasterFifoRpcSchedulerFactory.java
new file mode 100644
index 0000000..1e0a4e8
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MasterFifoRpcSchedulerFactory.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ipc.MasterFifoRpcScheduler;
+import org.apache.hadoop.hbase.ipc.PriorityFunction;
+import org.apache.hadoop.hbase.ipc.RpcScheduler;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+/**
+ * Factory to use when you want to use the {@link MasterFifoRpcScheduler}
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class MasterFifoRpcSchedulerFactory extends FifoRpcSchedulerFactory {
+ @Override
+ public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) {
+ int totalHandlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
+ HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
+ int rsReportHandlerCount = conf.getInt(HConstants.REGION_SERVER_REPORT_HANDLER_COUNT, 0);
+ if (rsReportHandlerCount == 0) {
+ rsReportHandlerCount = Math.max(1, totalHandlerCount / 2);
+ }
+ int callHandlerCount = Math.max(1, totalHandlerCount - rsReportHandlerCount);
+ return new MasterFifoRpcScheduler(conf, callHandlerCount, rsReportHandlerCount);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/48d38741/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index e292ce1..cb97d35 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -94,6 +94,7 @@ import org.apache.hadoop.hbase.ipc.PriorityFunction;
import org.apache.hadoop.hbase.ipc.QosPriority;
import org.apache.hadoop.hbase.ipc.RpcCallContext;
import org.apache.hadoop.hbase.ipc.RpcCallback;
+import org.apache.hadoop.hbase.ipc.RpcScheduler;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
import org.apache.hadoop.hbase.ipc.RpcServerFactory;
@@ -257,6 +258,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS =
"hbase.region.server.rpc.scheduler.factory.class";
+ /** RPC scheduler to use for the master. */
+ public static final String MASTER_RPC_SCHEDULER_FACTORY_CLASS =
+ "hbase.master.rpc.scheduler.factory.class";
+
/**
* Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This
* configuration exists to prevent the scenario where a time limit is specified to be so
@@ -1203,10 +1208,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
rowSizeWarnThreshold = rs.conf.getInt(BATCH_ROWS_THRESHOLD_NAME, BATCH_ROWS_THRESHOLD_DEFAULT);
RpcSchedulerFactory rpcSchedulerFactory;
try {
- Class<?> cls = rs.conf.getClass(
- REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
- SimpleRpcSchedulerFactory.class);
- rpcSchedulerFactory = cls.asSubclass(RpcSchedulerFactory.class)
+ rpcSchedulerFactory = getRpcSchedulerFactoryClass().asSubclass(RpcSchedulerFactory.class)
.getDeclaredConstructor().newInstance();
} catch (NoSuchMethodException | InvocationTargetException |
InstantiationException | IllegalAccessException e) {
@@ -1283,6 +1285,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
}
+ protected Class<?> getRpcSchedulerFactoryClass() {
+ return this.regionServer.conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
+ SimpleRpcSchedulerFactory.class);
+ }
+
@Override
public void onConfigurationChange(Configuration newConf) {
if (rpcServer instanceof ConfigurationObserver) {
@@ -3700,4 +3707,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
throw new ServiceException(e);
}
}
+
+ @VisibleForTesting
+ public RpcScheduler getRpcScheduler() {
+ return rpcServer.getScheduler();
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/48d38741/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestMasterFifoRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestMasterFifoRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestMasterFifoRpcScheduler.java
new file mode 100644
index 0000000..04eb82d
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestMasterFifoRpcScheduler.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.MasterRpcServices;
+import org.apache.hadoop.hbase.regionserver.RSRpcServices;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.RPCTests;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
+
+@Category({ RPCTests.class, LargeTests.class })
+public class TestMasterFifoRpcScheduler {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestMasterFifoRpcScheduler.class);
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestMasterFifoRpcScheduler.class);
+
+ private static final String REGION_SERVER_REPORT = "RegionServerReport";
+ private static final String OTHER = "Other";
+ private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ @BeforeClass
+ public static void setupBeforeClass() throws Exception {
+ Configuration conf = TEST_UTIL.getConfiguration();
+ conf.set(RSRpcServices.MASTER_RPC_SCHEDULER_FACTORY_CLASS,
+ "org.apache.hadoop.hbase.regionserver.MasterFifoRpcSchedulerFactory");
+ conf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 5);
+ conf.setInt(HConstants.REGION_SERVER_REPORT_HANDLER_COUNT, 2);
+ TEST_UTIL.startMiniCluster();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testMasterRpcScheduler() {
+ HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
+ MasterRpcServices masterRpcServices = master.getMasterRpcServices();
+ RpcScheduler masterRpcScheduler = masterRpcServices.getRpcScheduler();
+ Assert.assertTrue(masterRpcScheduler instanceof MasterFifoRpcScheduler);
+ }
+
+ @Test
+ public void testCallQueueInfo() throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ AtomicInteger callExecutionCount = new AtomicInteger(0);
+
+ RpcScheduler scheduler = new MockMasterFifoRpcScheduler(conf, 2, 1);
+ scheduler.start();
+
+ int totalCallMethods = 30;
+ int unableToDispatch = 0;
+
+ for (int i = totalCallMethods; i > 0; i--) {
+ CallRunner task = createMockTask(callExecutionCount, i < 20);
+ if (!scheduler.dispatch(task)) {
+ unableToDispatch++;
+ }
+ Thread.sleep(10);
+ }
+
+ CallQueueInfo callQueueInfo = scheduler.getCallQueueInfo();
+ int executionCount = callExecutionCount.get();
+
+ String expectedQueueName = "Master Fifo Queue";
+ assertEquals(1, callQueueInfo.getCallQueueNames().size());
+
+ long callQueueSize = 0;
+ for (String queueName : callQueueInfo.getCallQueueNames()) {
+ assertEquals(expectedQueueName, queueName);
+ Set<String> methodNames = callQueueInfo.getCalledMethodNames(queueName);
+ if (methodNames.size() == 2) {
+ assertTrue(methodNames.contains(REGION_SERVER_REPORT));
+ assertTrue(methodNames.contains(OTHER));
+ }
+ for (String methodName : callQueueInfo.getCalledMethodNames(queueName)) {
+ callQueueSize += callQueueInfo.getCallMethodCount(queueName, methodName);
+ }
+ }
+
+ assertEquals(totalCallMethods - unableToDispatch, callQueueSize + executionCount);
+ scheduler.stop();
+ }
+
+ private CallRunner createMockTask(AtomicInteger callExecutionCount,
+ boolean isRegionServerReportTask) {
+ CallRunner task = mock(CallRunner.class);
+ ServerCall call = mock(ServerCall.class);
+ when(task.getRpcCall()).thenReturn(call);
+ when(call.getHeader()).thenReturn(RPCProtos.RequestHeader.newBuilder()
+ .setMethodName(isRegionServerReportTask ? REGION_SERVER_REPORT : OTHER).build());
+
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ callExecutionCount.incrementAndGet();
+ Thread.sleep(1000);
+ return null;
+ }
+ }).when(task).run();
+
+ return task;
+ }
+
+ private static class MockMasterFifoRpcScheduler extends MasterFifoRpcScheduler {
+
+ public MockMasterFifoRpcScheduler(Configuration conf, int callHandlerCount,
+ int rsReportHandlerCount) {
+ super(conf, callHandlerCount, rsReportHandlerCount);
+ }
+
+ /**
+ * Override this method because we can't mock a Descriptors.MethodDescriptor
+ */
+ @Override
+ protected String getCallMethod(final CallRunner task) {
+ RpcCall call = task.getRpcCall();
+ if (call.getHeader() != null) {
+ return call.getHeader().getMethodName();
+ }
+ return null;
+ }
+ }
+}