You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2014/07/15 00:09:25 UTC
[3/3] git commit: HBASE-11513 Combine SingleMultiple Queue
RpcExecutor into a single class (Jesse Yates)
HBASE-11513 Combine SingleMultiple Queue RpcExecutor into a single class (Jesse Yates)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/22f205b0
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/22f205b0
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/22f205b0
Branch: refs/heads/branch-1
Commit: 22f205b09b3f4c43a2ef95a177b9c66f4614cde4
Parents: 044d62a
Author: Andrew Purtell <ap...@apache.org>
Authored: Mon Jul 14 14:39:59 2014 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Mon Jul 14 15:05:16 2014 -0700
----------------------------------------------------------------------
.../hbase/ipc/BalancedQueueRpcExecutor.java | 127 +++++++++++++++++++
.../hbase/ipc/MultipleQueueRpcExecutor.java | 82 ------------
.../hadoop/hbase/ipc/SimpleRpcScheduler.java | 27 ++--
.../hbase/ipc/SingleQueueRpcExecutor.java | 66 ----------
4 files changed, 136 insertions(+), 166 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/22f205b0/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
new file mode 100644
index 0000000..7cf2101
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * An {@link RpcExecutor} that will balance requests evenly across all its queues, but still remains
+ * efficient with a single queue via an inlinable queue balancing mechanism.
+ */
+@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX })
+@InterfaceStability.Evolving
+public class BalancedQueueRpcExecutor extends RpcExecutor {
+
+ protected final List<BlockingQueue<CallRunner>> queues;
+ private QueueBalancer balancer;
+
+ public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
+ final int maxQueueLength) {
+ this(name, handlerCount, numQueues, LinkedBlockingQueue.class, maxQueueLength);
+ }
+
+ public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
+ final Class<? extends BlockingQueue> queueClass, Object... initargs) {
+ super(name, Math.max(handlerCount, numQueues));
+ queues = new ArrayList<BlockingQueue<CallRunner>>(numQueues);
+ this.balancer = getBalancer(numQueues);
+ initializeQueues(numQueues, queueClass, initargs);
+ }
+
+ protected void initializeQueues(final int numQueues,
+ final Class<? extends BlockingQueue> queueClass, Object... initargs) {
+ for (int i = 0; i < numQueues; ++i) {
+ queues.add((BlockingQueue<CallRunner>) ReflectionUtils.newInstance(queueClass, initargs));
+ }
+ }
+
+ @Override
+ public void dispatch(final CallRunner callTask) throws InterruptedException {
+ int queueIndex = balancer.getNextQueue();
+ queues.get(queueIndex).put(callTask);
+ }
+
+ @Override
+ public int getQueueLength() {
+ int length = 0;
+ for (final BlockingQueue<CallRunner> queue : queues) {
+ length += queue.size();
+ }
+ return length;
+ }
+
+ @Override
+ public List<BlockingQueue<CallRunner>> getQueues() {
+ return queues;
+ }
+
+ private static abstract class QueueBalancer {
+ /**
+ * @return the index of the next queue to which a request should be inserted
+ */
+ public abstract int getNextQueue();
+ }
+
+ public static QueueBalancer getBalancer(int queueSize) {
+ Preconditions.checkArgument(queueSize > 0, "Queue size is <= 0, must be at least 1");
+ if (queueSize == 1) {
+ return ONE_QUEUE;
+ } else {
+ return new RandomQueueBalancer(queueSize);
+ }
+ }
+
+ /**
+ * All requests go to the first queue, at index 0
+ */
+ private static QueueBalancer ONE_QUEUE = new QueueBalancer() {
+
+ @Override
+ public int getNextQueue() {
+ return 0;
+ }
+ };
+
+ /**
+ * Queue balancer that just randomly selects a queue in the range [0, num queues).
+ */
+ private static class RandomQueueBalancer extends QueueBalancer {
+ private int queueSize;
+ private Random random;
+
+ public RandomQueueBalancer(int queueSize) {
+ this.queueSize = queueSize;
+ this.random = new Random();
+ }
+
+ public int getNextQueue() {
+ return random.nextInt(queueSize);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/22f205b0/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MultipleQueueRpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MultipleQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MultipleQueueRpcExecutor.java
deleted file mode 100644
index 71ddfa6..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MultipleQueueRpcExecutor.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.ipc;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.util.ReflectionUtils;
-
-/**
- * RPC Executor that dispatch the requests on multiple queues.
- * Each handler has its own queue and there is no stealing.
- */
-@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
-@InterfaceStability.Evolving
-public class MultipleQueueRpcExecutor extends RpcExecutor {
- protected final List<BlockingQueue<CallRunner>> queues;
- protected final Random balancer = new Random();
-
- public MultipleQueueRpcExecutor(final String name, final int handlerCount,
- final int numQueues, final int maxQueueLength) {
- this(name, handlerCount, numQueues, LinkedBlockingQueue.class, maxQueueLength);
- }
-
- public MultipleQueueRpcExecutor(final String name, final int handlerCount,
- final int numQueues,
- final Class<? extends BlockingQueue> queueClass, Object... initargs) {
- super(name, Math.max(handlerCount, numQueues));
- queues = new ArrayList<BlockingQueue<CallRunner>>(numQueues);
- initializeQueues(numQueues, queueClass, initargs);
- }
-
- protected void initializeQueues(final int numQueues,
- final Class<? extends BlockingQueue> queueClass, Object... initargs) {
- for (int i = 0; i < numQueues; ++i) {
- queues.add((BlockingQueue<CallRunner>)
- ReflectionUtils.newInstance(queueClass, initargs));
- }
- }
-
- @Override
- public void dispatch(final CallRunner callTask) throws InterruptedException {
- int queueIndex = balancer.nextInt(queues.size());
- queues.get(queueIndex).put(callTask);
- }
-
- @Override
- public int getQueueLength() {
- int length = 0;
- for (final BlockingQueue<CallRunner> queue: queues) {
- length += queue.size();
- }
- return length;
- }
-
- @Override
- protected List<BlockingQueue<CallRunner>> getQueues() {
- return queues;
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/22f205b0/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
index 4b46595..953bc36 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
@@ -127,33 +127,24 @@ public class SimpleRpcScheduler extends RpcScheduler {
callExecutor = new RWQueueRpcExecutor("default", handlerCount, numCallQueues,
callqReadShare, maxQueueLength);
}
- } else if (numCallQueues > 1) {
+ } else {
// multiple queues
if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
- callExecutor = new MultipleQueueRpcExecutor("default", handlerCount, numCallQueues,
+ callExecutor = new BalancedQueueRpcExecutor("default", handlerCount, numCallQueues,
BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority);
} else {
- callExecutor = new MultipleQueueRpcExecutor("default", handlerCount,
+ callExecutor = new BalancedQueueRpcExecutor("default", handlerCount,
numCallQueues, maxQueueLength);
}
- } else {
- // Single queue
- if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
- CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
- callExecutor = new SingleQueueRpcExecutor("default", handlerCount,
- BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority);
- } else {
- callExecutor = new SingleQueueRpcExecutor("default", handlerCount, maxQueueLength);
- }
}
- this.priorityExecutor = priorityHandlerCount > 0
- ? new SingleQueueRpcExecutor("Priority", priorityHandlerCount, maxQueueLength)
- : null;
- this.replicationExecutor = replicationHandlerCount > 0
- ? new SingleQueueRpcExecutor("Replication", replicationHandlerCount, maxQueueLength)
- : null;
+ this.priorityExecutor =
+ priorityHandlerCount > 0 ? new BalancedQueueRpcExecutor("Priority", priorityHandlerCount,
+ 1, maxQueueLength) : null;
+ this.replicationExecutor =
+ replicationHandlerCount > 0 ? new BalancedQueueRpcExecutor("Replication",
+ replicationHandlerCount, 1, maxQueueLength) : null;
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/22f205b0/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SingleQueueRpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SingleQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SingleQueueRpcExecutor.java
deleted file mode 100644
index b94b14b..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SingleQueueRpcExecutor.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.ipc;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.util.ReflectionUtils;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
-/**
- * RPC Executor that uses a single queue for all the requests.
- */
-@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
-@InterfaceStability.Evolving
-public class SingleQueueRpcExecutor extends RpcExecutor {
- private final BlockingQueue<CallRunner> queue;
-
- public SingleQueueRpcExecutor(final String name, final int handlerCount,
- final int maxQueueLength) {
- this(name, handlerCount, LinkedBlockingQueue.class, maxQueueLength);
- }
-
- public SingleQueueRpcExecutor(final String name, final int handlerCount,
- final Class<? extends BlockingQueue> queueClass, Object... initargs) {
- super(name, handlerCount);
- queue = (BlockingQueue<CallRunner>)ReflectionUtils.newInstance(queueClass, initargs);
- }
-
- @Override
- public void dispatch(final CallRunner callTask) throws InterruptedException {
- queue.put(callTask);
- }
-
- @Override
- public int getQueueLength() {
- return queue.size();
- }
-
- @Override
- protected List<BlockingQueue<CallRunner>> getQueues() {
- List<BlockingQueue<CallRunner>> list = new ArrayList<BlockingQueue<CallRunner>>(1);
- list.add(queue);
- return list;
- }
-}