You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2016/02/27 00:09:27 UTC
[06/37] hbase git commit: HBASE-15306 Make RPC call queue length
dynamically configurable
HBASE-15306 Make RPC call queue length dynamically configurable
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f47dba74
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f47dba74
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f47dba74
Branch: refs/heads/hbase-12439
Commit: f47dba74d498d5d39f124ad8ea5723c437acbc85
Parents: 58283fa
Author: Mikhail Antonov <an...@apache.org>
Authored: Tue Feb 23 14:20:40 2016 -0800
Committer: Mikhail Antonov <an...@apache.org>
Committed: Tue Feb 23 14:20:40 2016 -0800
----------------------------------------------------------------------
.../hbase/ipc/BalancedQueueRpcExecutor.java | 11 +++++-
.../hadoop/hbase/ipc/RWQueueRpcExecutor.java | 19 +++++++++-
.../apache/hadoop/hbase/ipc/RpcExecutor.java | 11 ++++++
.../org/apache/hadoop/hbase/ipc/RpcServer.java | 3 ++
.../hadoop/hbase/ipc/SimpleRpcScheduler.java | 18 ++++++++-
.../hbase/ipc/TestSimpleRpcScheduler.java | 39 ++++++++++++++++++++
6 files changed, 97 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/f47dba74/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
index 79b4ec8..e4205eb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
@@ -66,6 +66,10 @@ public class BalancedQueueRpcExecutor extends RpcExecutor {
protected void initializeQueues(final int numQueues,
final Class<? extends BlockingQueue> queueClass, Object... initargs) {
+ if (initargs.length > 0) {
+ currentQueueLimit = (int) initargs[0];
+ initargs[0] = Math.max((int) initargs[0], DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT);
+ }
for (int i = 0; i < numQueues; ++i) {
queues.add((BlockingQueue<CallRunner>) ReflectionUtils.newInstance(queueClass, initargs));
}
@@ -74,7 +78,12 @@ public class BalancedQueueRpcExecutor extends RpcExecutor {
@Override
public boolean dispatch(final CallRunner callTask) throws InterruptedException {
int queueIndex = balancer.getNextQueue();
- return queues.get(queueIndex).offer(callTask);
+ BlockingQueue<CallRunner> queue = queues.get(queueIndex);
+ // that means we can overflow by at most <num reader> size (5), that's ok
+ if (queue.size() >= currentQueueLimit) {
+ return false;
+ }
+ return queue.offer(callTask);
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/f47dba74/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
index 544370d..a9648b0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
@@ -139,12 +139,22 @@ public class RWQueueRpcExecutor extends RpcExecutor {
" readQueues=" + numReadQueues + " readHandlers=" + readHandlersCount +
((numScanQueues == 0) ? "" : " scanQueues=" + numScanQueues +
" scanHandlers=" + scanHandlersCount));
-
+ if (writeQueueInitArgs.length > 0) {
+ currentQueueLimit = (int) writeQueueInitArgs[0];
+ writeQueueInitArgs[0] = Math.max((int) writeQueueInitArgs[0],
+ DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT);
+ }
for (int i = 0; i < numWriteQueues; ++i) {
+
queues.add((BlockingQueue<CallRunner>)
ReflectionUtils.newInstance(writeQueueClass, writeQueueInitArgs));
}
+ if (readQueueInitArgs.length > 0) {
+ currentQueueLimit = (int) readQueueInitArgs[0];
+ readQueueInitArgs[0] = Math.max((int) readQueueInitArgs[0],
+ DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT);
+ }
for (int i = 0; i < (numReadQueues + numScanQueues); ++i) {
queues.add((BlockingQueue<CallRunner>)
ReflectionUtils.newInstance(readQueueClass, readQueueInitArgs));
@@ -170,7 +180,12 @@ public class RWQueueRpcExecutor extends RpcExecutor {
} else {
queueIndex = numWriteQueues + readBalancer.getNextQueue();
}
- return queues.get(queueIndex).offer(callTask);
+
+ BlockingQueue<CallRunner> queue = queues.get(queueIndex);
+ if (queue.size() >= currentQueueLimit) {
+ return false;
+ }
+ return queue.offer(callTask);
}
private boolean isWriteRequest(final RequestHeader header, final Message param) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/f47dba74/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
index 017bf39..22cb195 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
@@ -42,6 +42,9 @@ import com.google.common.base.Strings;
public abstract class RpcExecutor {
private static final Log LOG = LogFactory.getLog(RpcExecutor.class);
+ protected static final int DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT = 250;
+ protected volatile int currentQueueLimit;
+
private final AtomicInteger activeHandlerCount = new AtomicInteger(0);
private final List<Thread> handlers;
private final int handlerCount;
@@ -210,4 +213,12 @@ public abstract class RpcExecutor {
return ThreadLocalRandom.current().nextInt(queueSize);
}
}
+
+ /**
+ * Update current soft limit for executor's call queues
+ * @param conf updated configuration
+ */
+ public void resizeQueues(Configuration conf) {
+ currentQueueLimit = conf.getInt("hbase.ipc.server.max.callqueue.length", currentQueueLimit);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f47dba74/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 58fc598..6ddfb9a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -2099,6 +2099,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
@Override
public void onConfigurationChange(Configuration newConf) {
initReconfigurable(newConf);
+ if (scheduler instanceof ConfigurationObserver) {
+ ((ConfigurationObserver)scheduler).onConfigurationChange(newConf);
+ }
}
private void initReconfigurable(Configuration confToLoad) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/f47dba74/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
index 8de714d..0003254 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
/**
@@ -36,7 +37,7 @@ import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
*/
@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
@InterfaceStability.Evolving
-public class SimpleRpcScheduler extends RpcScheduler {
+public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObserver {
private static final Log LOG = LogFactory.getLog(SimpleRpcScheduler.class);
public static final String CALL_QUEUE_READ_SHARE_CONF_KEY =
@@ -56,6 +57,21 @@ public class SimpleRpcScheduler extends RpcScheduler {
= "hbase.ipc.server.queue.max.call.delay";
/**
+ * Resize call queues;
+ * @param conf new configuration
+ */
+ @Override
+ public void onConfigurationChange(Configuration conf) {
+ callExecutor.resizeQueues(conf);
+ if (priorityExecutor != null) {
+ priorityExecutor.resizeQueues(conf);
+ }
+ if (replicationExecutor != null) {
+ replicationExecutor.resizeQueues(conf);
+ }
+ }
+
+ /**
* Comparator used by the "normal callQueue" if DEADLINE_CALL_QUEUE_CONF_KEY is set to true.
* It uses the calculated "deadline" e.g. to deprioritize long-running job
*
http://git-wip-us.apache.org/repos/asf/hbase/blob/f47dba74/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
index db992cd..66032e9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
@@ -56,7 +56,9 @@ import java.util.Map;
import java.util.concurrent.CountDownLatch;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.eq;
@@ -323,4 +325,41 @@ public class TestSimpleRpcScheduler {
}
}).when(callTask).run();
}
+
+ @Test
+ public void testSoftAndHardQueueLimits() throws Exception {
+ Configuration schedConf = HBaseConfiguration.create();
+
+ schedConf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 0);
+ schedConf.setInt("hbase.ipc.server.max.callqueue.length", 5);
+
+ PriorityFunction priority = mock(PriorityFunction.class);
+ when(priority.getPriority(any(RequestHeader.class), any(Message.class),
+ any(User.class))).thenReturn(HConstants.NORMAL_QOS);
+ SimpleRpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 0, 0, 0, priority,
+ HConstants.QOS_THRESHOLD);
+ try {
+ scheduler.start();
+
+ CallRunner putCallTask = mock(CallRunner.class);
+ RpcServer.Call putCall = mock(RpcServer.Call.class);
+ putCall.param = RequestConverter.buildMutateRequest(
+ Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
+ RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
+ when(putCallTask.getCall()).thenReturn(putCall);
+ when(putCall.getHeader()).thenReturn(putHead);
+
+ assertTrue(scheduler.dispatch(putCallTask));
+
+ schedConf.setInt("hbase.ipc.server.max.callqueue.length", 0);
+ scheduler.onConfigurationChange(schedConf);
+ assertFalse(scheduler.dispatch(putCallTask));
+
+ schedConf.setInt("hbase.ipc.server.max.callqueue.length", 1);
+ scheduler.onConfigurationChange(schedConf);
+ assertTrue(scheduler.dispatch(putCallTask));
+ } finally {
+ scheduler.stop();
+ }
+ }
}