You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2014/07/01 10:35:29 UTC
[3/3] git commit: HBASE-11355 a couple of callQueue related
improvements
HBASE-11355 a couple of callQueue related improvements
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9a6a59c7
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9a6a59c7
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9a6a59c7
Branch: refs/heads/branch-1
Commit: 9a6a59c7b7d8357c50fd32f01d0ca21911db3da2
Parents: 0167558
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Tue Jul 1 09:29:07 2014 +0200
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Tue Jul 1 10:33:39 2014 +0200
----------------------------------------------------------------------
.../hadoop/hbase/util/ReflectionUtils.java | 52 +++++-
.../src/main/resources/hbase-default.xml | 16 ++
.../hbase/ipc/MultipleQueueRpcExecutor.java | 87 ++++++++++
.../hadoop/hbase/ipc/RWQueueRpcExecutor.java | 163 +++++++++++++++++++
.../apache/hadoop/hbase/ipc/RpcExecutor.java | 128 +++++++++++++++
.../hadoop/hbase/ipc/SimpleRpcScheduler.java | 154 ++++++++----------
.../hbase/ipc/SingleQueueRpcExecutor.java | 71 ++++++++
7 files changed, 582 insertions(+), 89 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/9a6a59c7/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java
index cda5424..6629868 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java
@@ -18,6 +18,7 @@
*/
package org.apache.hadoop.hbase.util;
+import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -26,13 +27,23 @@ import org.apache.hadoop.classification.InterfaceAudience;
public class ReflectionUtils {
@SuppressWarnings("unchecked")
public static <T> T instantiateWithCustomCtor(String className,
- Class<? >[] ctorArgTypes, Object[] ctorArgs) {
+ Class<? >[] ctorArgTypes, Object[] ctorArgs) {
try {
Class<? extends T> resultType = (Class<? extends T>) Class.forName(className);
- return resultType.getDeclaredConstructor(ctorArgTypes).newInstance(ctorArgs);
+ Constructor<? extends T> ctor = resultType.getDeclaredConstructor(ctorArgTypes);
+ return instantiate(className, ctor, ctorArgs);
} catch (ClassNotFoundException e) {
throw new UnsupportedOperationException(
"Unable to find " + className, e);
+ } catch (NoSuchMethodException e) {
+ throw new UnsupportedOperationException(
+ "Unable to find suitable constructor for class " + className, e);
+ }
+ }
+
+ private static <T> T instantiate(final String className, Constructor<T> ctor, Object[] ctorArgs) {
+ try {
+ return ctor.newInstance(ctorArgs);
} catch (IllegalAccessException e) {
throw new UnsupportedOperationException(
"Unable to access specified class " + className, e);
@@ -42,9 +53,40 @@ public class ReflectionUtils {
} catch (InvocationTargetException e) {
throw new UnsupportedOperationException(
"Constructor threw an exception for " + className, e);
- } catch (NoSuchMethodException e) {
- throw new UnsupportedOperationException(
- "Unable to find suitable constructor for class " + className, e);
}
}
+
+ @SuppressWarnings("unchecked")
+ public static <T> T newInstance(Class<T> type, Object... params) {
+ return instantiate(type.getName(), findConstructor(type, params), params);
+ }
+
+ @SuppressWarnings("unchecked")
+ public static <T> Constructor<T> findConstructor(Class<T> type, Object... paramTypes) {
+ Constructor<T>[] constructors = (Constructor<T>[])type.getConstructors();
+ for (Constructor<T> ctor : constructors) {
+ Class<?>[] ctorParamTypes = ctor.getParameterTypes();
+ if (ctorParamTypes.length != paramTypes.length) {
+ continue;
+ }
+
+ boolean match = true;
+ for (int i = 0; i < ctorParamTypes.length && match; ++i) {
+ Class<?> paramType = paramTypes[i].getClass();
+ match = (!ctorParamTypes[i].isPrimitive()) ? ctorParamTypes[i].isAssignableFrom(paramType) :
+ ((int.class.equals(ctorParamTypes[i]) && Integer.class.equals(paramType)) ||
+ (long.class.equals(ctorParamTypes[i]) && Long.class.equals(paramType)) ||
+ (char.class.equals(ctorParamTypes[i]) && Character.class.equals(paramType)) ||
+ (short.class.equals(ctorParamTypes[i]) && Short.class.equals(paramType)) ||
+ (boolean.class.equals(ctorParamTypes[i]) && Boolean.class.equals(paramType)) ||
+ (byte.class.equals(ctorParamTypes[i]) && Byte.class.equals(paramType)));
+ }
+
+ if (match) {
+ return ctor;
+ }
+ }
+ throw new UnsupportedOperationException(
+ "Unable to find suitable constructor for class " + type.getName());
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9a6a59c7/hbase-common/src/main/resources/hbase-default.xml
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index 43723f8..70d20a7 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -181,6 +181,22 @@ possible configurations would overwhelm and obscure the important.
Same property is used by the Master for count of master handlers.</description>
</property>
<property>
+ <name>ipc.server.callqueue.handler.factor</name>
+ <value>0.1</value>
+ <description>Factor to determine the number of call queues.
+ A value of 0 means a single queue shared between all the handlers.
+ A value of 1 means that each handler has its own queue.</description>
+ </property>
+ <property>
+ <name>ipc.server.callqueue.read.share</name>
+ <value>0</value>
+ <description>Split the call queues into read and write queues.
+ A value of 0 indicate to not split the call queues.
+ A value of 0.5 means there will be the same number of read and write queues
+ A value of 1.0 means that all the queues except one are used to dispatch read requests.
+ </description>
+ </property>
+ <property>
<name>hbase.regionserver.msginterval</name>
<value>3000</value>
<description>Interval between messages from the RegionServer to Master
http://git-wip-us.apache.org/repos/asf/hbase/blob/9a6a59c7/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MultipleQueueRpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MultipleQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MultipleQueueRpcExecutor.java
new file mode 100644
index 0000000..ab14906
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MultipleQueueRpcExecutor.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
+
+import com.google.common.collect.Lists;
+
+/**
+ * RPC Executor that dispatch the requests on multiple queues.
+ * Each handler has its own queue and there is no stealing.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class MultipleQueueRpcExecutor extends RpcExecutor {
+ protected final List<BlockingQueue<CallRunner>> queues;
+ protected final Random balancer = new Random();
+
+ public MultipleQueueRpcExecutor(final String name, final int handlerCount,
+ final int numQueues, final int maxQueueLength) {
+ this(name, handlerCount, numQueues, LinkedBlockingQueue.class, maxQueueLength);
+ }
+
+ public MultipleQueueRpcExecutor(final String name, final int handlerCount,
+ final int numQueues,
+ final Class<? extends BlockingQueue> queueClass, Object... initargs) {
+ super(name, Math.max(handlerCount, numQueues));
+ queues = new ArrayList<BlockingQueue<CallRunner>>(numQueues);
+ initializeQueues(numQueues, queueClass, initargs);
+ }
+
+ protected void initializeQueues(final int numQueues,
+ final Class<? extends BlockingQueue> queueClass, Object... initargs) {
+ for (int i = 0; i < numQueues; ++i) {
+ queues.add((BlockingQueue<CallRunner>)
+ ReflectionUtils.newInstance(queueClass, initargs));
+ }
+ }
+
+ @Override
+ public void dispatch(final CallRunner callTask) throws InterruptedException {
+ int queueIndex = balancer.nextInt(queues.size());
+ queues.get(queueIndex).put(callTask);
+ }
+
+ @Override
+ public int getQueueLength() {
+ int length = 0;
+ for (final BlockingQueue<CallRunner> queue: queues) {
+ length += queue.size();
+ }
+ return length;
+ }
+
+ @Override
+ protected List<BlockingQueue<CallRunner>> getQueues() {
+ return queues;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9a6a59c7/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
new file mode 100644
index 0000000..1eb1a22
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Action;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
+
+import com.google.common.collect.Lists;
+import com.google.protobuf.Message;
+
+/**
+ * RPC Executor that uses different queues for reads and writes.
+ * Each handler has its own queue and there is no stealing.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class RWQueueRpcExecutor extends RpcExecutor {
+ private static final Log LOG = LogFactory.getLog(RWQueueRpcExecutor.class);
+
+ private final List<BlockingQueue<CallRunner>> queues;
+ private final Random balancer = new Random();
+ private final int writeHandlersCount;
+ private final int readHandlersCount;
+ private final int numWriteQueues;
+ private final int numReadQueues;
+
+ public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
+ final float readShare, final int maxQueueLength) {
+ this(name, handlerCount, numQueues, readShare, maxQueueLength,
+ LinkedBlockingQueue.class);
+ }
+
+ public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
+ final float readShare, final int maxQueueLength,
+ final Class<? extends BlockingQueue> readQueueClass, Object... readQueueInitArgs) {
+ this(name, calcNumWriters(handlerCount, readShare), calcNumReaders(handlerCount, readShare),
+ calcNumWriters(numQueues, readShare), calcNumReaders(numQueues, readShare),
+ LinkedBlockingQueue.class, new Object[] {maxQueueLength},
+ readQueueClass, ArrayUtils.addAll(new Object[] {maxQueueLength}, readQueueInitArgs));
+ }
+
+ public RWQueueRpcExecutor(final String name, final int writeHandlers, final int readHandlers,
+ final int numWriteQueues, final int numReadQueues,
+ final Class<? extends BlockingQueue> writeQueueClass, Object[] writeQueueInitArgs,
+ final Class<? extends BlockingQueue> readQueueClass, Object[] readQueueInitArgs) {
+ super(name, Math.max(writeHandlers + readHandlers, numWriteQueues + numReadQueues));
+
+ this.writeHandlersCount = Math.max(writeHandlers, numWriteQueues);
+ this.readHandlersCount = Math.max(readHandlers, numReadQueues);
+ this.numWriteQueues = numWriteQueues;
+ this.numReadQueues = numReadQueues;
+
+ queues = new ArrayList<BlockingQueue<CallRunner>>(writeHandlersCount + readHandlersCount);
+ LOG.debug(name + " writeQueues=" + numWriteQueues + " writeHandlers=" + writeHandlersCount +
+ " readQueues=" + numReadQueues + " readHandlers=" + readHandlersCount);
+
+ for (int i = 0; i < numWriteQueues; ++i) {
+ queues.add((BlockingQueue<CallRunner>)
+ ReflectionUtils.newInstance(writeQueueClass, writeQueueInitArgs));
+ }
+
+ for (int i = 0; i < numReadQueues; ++i) {
+ queues.add((BlockingQueue<CallRunner>)
+ ReflectionUtils.newInstance(readQueueClass, readQueueInitArgs));
+ }
+ }
+
+ @Override
+ protected void startHandlers(final int port) {
+ startHandlers(".write", writeHandlersCount, queues, 0, numWriteQueues, port);
+ startHandlers(".read", readHandlersCount, queues, numWriteQueues, numReadQueues, port);
+ }
+
+ @Override
+ public void dispatch(final CallRunner callTask) throws InterruptedException {
+ RpcServer.Call call = callTask.getCall();
+ int queueIndex;
+ if (isWriteRequest(call.getHeader(), call.param)) {
+ queueIndex = balancer.nextInt(numWriteQueues);
+ } else {
+ queueIndex = numWriteQueues + balancer.nextInt(numReadQueues);
+ }
+ queues.get(queueIndex).put(callTask);
+ }
+
+ private boolean isWriteRequest(final RequestHeader header, final Message param) {
+ // TODO: Is there a better way to do this?
+ String methodName = header.getMethodName();
+ if (methodName.equalsIgnoreCase("multi") && param instanceof MultiRequest) {
+ MultiRequest multi = (MultiRequest)param;
+ for (RegionAction regionAction : multi.getRegionActionList()) {
+ for (Action action: regionAction.getActionList()) {
+ if (action.hasMutation()) {
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public int getQueueLength() {
+ int length = 0;
+ for (final BlockingQueue<CallRunner> queue: queues) {
+ length += queue.size();
+ }
+ return length;
+ }
+
+ @Override
+ protected List<BlockingQueue<CallRunner>> getQueues() {
+ return queues;
+ }
+
+ /*
+ * Calculate the number of writers based on the "total count" and the read share.
+ * You'll get at least one writer.
+ */
+ private static int calcNumWriters(final int count, final float readShare) {
+ return Math.max(1, count - Math.max(1, (int)Math.round(count * readShare)));
+ }
+
+ /*
+ * Calculate the number of readers based on the "total count" and the read share.
+ * You'll get at least one reader.
+ */
+ private static int calcNumReaders(final int count, final float readShare) {
+ return count - calcNumWriters(count, readShare);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9a6a59c7/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
new file mode 100644
index 0000000..84a71ea
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class RpcExecutor {
+ private static final Log LOG = LogFactory.getLog(RpcExecutor.class);
+
+ private final AtomicInteger activeHandlerCount = new AtomicInteger(0);
+ private final List<Thread> handlers;
+ private final int handlerCount;
+ private final String name;
+
+ private boolean running;
+
+ public RpcExecutor(final String name, final int handlerCount) {
+ this.handlers = new ArrayList<Thread>(handlerCount);
+ this.handlerCount = handlerCount;
+ this.name = Strings.nullToEmpty(name);
+ }
+
+ public void start(final int port) {
+ running = true;
+ startHandlers(port);
+ }
+
+ public void stop() {
+ running = false;
+ for (Thread handler : handlers) {
+ handler.interrupt();
+ }
+ }
+
+ public int getActiveHandlerCount() {
+ return activeHandlerCount.get();
+ }
+
+ /** Returns the length of the pending queue */
+ public abstract int getQueueLength();
+
+ /** Add the request to the executor queue */
+ public abstract void dispatch(final CallRunner callTask) throws InterruptedException;
+
+ /** Returns the list of request queues */
+ protected abstract List<BlockingQueue<CallRunner>> getQueues();
+
+ protected void startHandlers(final int port) {
+ List<BlockingQueue<CallRunner>> callQueues = getQueues();
+ startHandlers(null, handlerCount, callQueues, 0, callQueues.size(), port);
+ }
+
+ protected void startHandlers(final String nameSuffix, final int numHandlers,
+ final List<BlockingQueue<CallRunner>> callQueues,
+ final int qindex, final int qsize, final int port) {
+ final String threadPrefix = name + Strings.nullToEmpty(nameSuffix);
+ for (int i = 0; i < numHandlers; i++) {
+ final int index = qindex + (i % qsize);
+ Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ consumerLoop(callQueues.get(index));
+ }
+ });
+ t.setDaemon(true);
+ t.setName(threadPrefix + "RpcServer.handler=" + handlers.size() +
+ ",queue=" + index + ",port=" + port);
+ t.start();
+ LOG.debug(threadPrefix + " Start Handler index=" + handlers.size() + " queue=" + index);
+ handlers.add(t);
+ }
+ }
+
+ protected void consumerLoop(final BlockingQueue<CallRunner> myQueue) {
+ boolean interrupted = false;
+ try {
+ while (running) {
+ try {
+ CallRunner task = myQueue.take();
+ try {
+ activeHandlerCount.incrementAndGet();
+ task.run();
+ } finally {
+ activeHandlerCount.decrementAndGet();
+ }
+ } catch (InterruptedException e) {
+ interrupted = true;
+ }
+ }
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9a6a59c7/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
index 764900b..27f5427 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
@@ -17,7 +17,11 @@
*/
package org.apache.hadoop.hbase.ipc;
+import java.io.IOException;
+
+import java.util.Random;
import java.util.Comparator;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -43,7 +47,11 @@ import com.google.common.collect.Lists;
public class SimpleRpcScheduler implements RpcScheduler {
public static final Log LOG = LogFactory.getLog(SimpleRpcScheduler.class);
- /** If set to true, uses a priority queue and deprioritize long-running scans */
+ public static final String CALL_QUEUE_READ_SHARE_CONF_KEY = "ipc.server.callqueue.read.share";
+ public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY =
+ "ipc.server.callqueue.handler.factor";
+
+ /** If set to 'deadline', uses a priority queue and deprioritize long-running scans */
public static final String CALL_QUEUE_TYPE_CONF_KEY = "ipc.server.callqueue.type";
public static final String CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE = "deadline";
public static final String CALL_QUEUE_TYPE_FIFO_CONF_VALUE = "fifo";
@@ -82,16 +90,11 @@ public class SimpleRpcScheduler implements RpcScheduler {
}
private int port;
- private final int handlerCount;
- private final int priorityHandlerCount;
- private final int replicationHandlerCount;
private final PriorityFunction priority;
- final BlockingQueue<CallRunner> callQueue;
- final BlockingQueue<CallRunner> priorityCallQueue;
- final BlockingQueue<CallRunner> replicationQueue;
- private volatile boolean running = false;
- private final List<Thread> handlers = Lists.newArrayList();
- private AtomicInteger activeHandlerCount = new AtomicInteger(0);
+ private final RpcExecutor callExecutor;
+ private final RpcExecutor priorityExecutor;
+ private final RpcExecutor replicationExecutor;
+
/** What level a high priority call is at. */
private final int highPriorityLevel;
@@ -112,25 +115,53 @@ public class SimpleRpcScheduler implements RpcScheduler {
int highPriorityLevel) {
int maxQueueLength = conf.getInt("ipc.server.max.callqueue.length",
handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
- this.handlerCount = handlerCount;
- this.priorityHandlerCount = priorityHandlerCount;
- this.replicationHandlerCount = replicationHandlerCount;
this.priority = priority;
this.highPriorityLevel = highPriorityLevel;
String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
- LOG.debug("Using " + callQueueType + " as user call queue");
- if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
- this.callQueue = new BoundedPriorityBlockingQueue<CallRunner>(maxQueueLength,
- new CallPriorityComparator(conf, this.priority));
+ float callqReadShare = conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
+
+ float callQueuesHandlersFactor = conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0);
+ int numCallQueues = Math.max(1, (int)Math.round(handlerCount * callQueuesHandlersFactor));
+
+ LOG.info("Using " + callQueueType + " as user call queue, count=" + numCallQueues);
+
+ if (numCallQueues > 1 && callqReadShare > 0) {
+ // multiple read/write queues
+ if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
+ CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
+ callExecutor = new RWQueueRpcExecutor("default", handlerCount, numCallQueues,
+ callqReadShare, maxQueueLength, BoundedPriorityBlockingQueue.class, callPriority);
+ } else {
+ callExecutor = new RWQueueRpcExecutor("default", handlerCount, numCallQueues,
+ callqReadShare, maxQueueLength);
+ }
+ } else if (numCallQueues > 1) {
+ // multiple queues
+ if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
+ CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
+ callExecutor = new MultipleQueueRpcExecutor("default", handlerCount, numCallQueues,
+ BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority);
+ } else {
+ callExecutor = new MultipleQueueRpcExecutor("default", handlerCount,
+ numCallQueues, maxQueueLength);
+ }
} else {
- this.callQueue = new LinkedBlockingQueue<CallRunner>(maxQueueLength);
+ // Single queue
+ if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
+ CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
+ callExecutor = new SingleQueueRpcExecutor("default", handlerCount,
+ BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority);
+ } else {
+ callExecutor = new SingleQueueRpcExecutor("default", handlerCount, maxQueueLength);
+ }
}
- this.priorityCallQueue = priorityHandlerCount > 0
- ? new LinkedBlockingQueue<CallRunner>(maxQueueLength)
+
+ this.priorityExecutor = priorityHandlerCount > 0
+ ? new SingleQueueRpcExecutor("Priority", priorityHandlerCount, maxQueueLength)
: null;
- this.replicationQueue = replicationHandlerCount > 0
- ? new LinkedBlockingQueue<CallRunner>(maxQueueLength)
+ this.replicationExecutor = replicationHandlerCount > 0
+ ? new SingleQueueRpcExecutor("Replication", replicationHandlerCount, maxQueueLength)
: null;
}
@@ -141,96 +172,51 @@ public class SimpleRpcScheduler implements RpcScheduler {
@Override
public void start() {
- running = true;
- startHandlers(handlerCount, callQueue, null);
- if (priorityCallQueue != null) {
- startHandlers(priorityHandlerCount, priorityCallQueue, "Priority.");
- }
- if (replicationQueue != null) {
- startHandlers(replicationHandlerCount, replicationQueue, "Replication.");
- }
- }
-
- private void startHandlers(
- int handlerCount,
- final BlockingQueue<CallRunner> callQueue,
- String threadNamePrefix) {
- for (int i = 0; i < handlerCount; i++) {
- Thread t = new Thread(new Runnable() {
- @Override
- public void run() {
- consumerLoop(callQueue);
- }
- });
- t.setDaemon(true);
- t.setName(Strings.nullToEmpty(threadNamePrefix) + "RpcServer.handler=" + i + ",port=" + port);
- t.start();
- handlers.add(t);
- }
+ callExecutor.start(port);
+ if (priorityExecutor != null) priorityExecutor.start(port);
+ if (replicationExecutor != null) replicationExecutor.start(port);
}
@Override
public void stop() {
- running = false;
- for (Thread handler : handlers) {
- handler.interrupt();
- }
+ callExecutor.stop();
+ if (priorityExecutor != null) priorityExecutor.stop();
+ if (replicationExecutor != null) replicationExecutor.stop();
}
@Override
public void dispatch(CallRunner callTask) throws InterruptedException {
RpcServer.Call call = callTask.getCall();
int level = priority.getPriority(call.getHeader(), call.param);
- if (priorityCallQueue != null && level > highPriorityLevel) {
- priorityCallQueue.put(callTask);
- } else if (replicationQueue != null && level == HConstants.REPLICATION_QOS) {
- replicationQueue.put(callTask);
+ if (priorityExecutor != null && level > highPriorityLevel) {
+ priorityExecutor.dispatch(callTask);
+ } else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) {
+ replicationExecutor.dispatch(callTask);
} else {
- callQueue.put(callTask); // queue the call; maybe blocked here
+ callExecutor.dispatch(callTask);
}
}
@Override
public int getGeneralQueueLength() {
- return callQueue.size();
+ return callExecutor.getQueueLength();
}
@Override
public int getPriorityQueueLength() {
- return priorityCallQueue == null ? 0 : priorityCallQueue.size();
+ return priorityExecutor == null ? 0 : priorityExecutor.getQueueLength();
}
@Override
public int getReplicationQueueLength() {
- return replicationQueue == null ? 0 : replicationQueue.size();
+ return replicationExecutor == null ? 0 : replicationExecutor.getQueueLength();
}
@Override
public int getActiveRpcHandlerCount() {
- return activeHandlerCount.get();
- }
-
- private void consumerLoop(BlockingQueue<CallRunner> myQueue) {
- boolean interrupted = false;
- try {
- while (running) {
- try {
- CallRunner task = myQueue.take();
- try {
- activeHandlerCount.incrementAndGet();
- task.run();
- } finally {
- activeHandlerCount.decrementAndGet();
- }
- } catch (InterruptedException e) {
- interrupted = true;
- }
- }
- } finally {
- if (interrupted) {
- Thread.currentThread().interrupt();
- }
- }
+ return callExecutor.getActiveHandlerCount() +
+ (priorityExecutor == null ? 0 : priorityExecutor.getActiveHandlerCount()) +
+ (replicationExecutor == null ? 0 : replicationExecutor.getActiveHandlerCount());
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9a6a59c7/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SingleQueueRpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SingleQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SingleQueueRpcExecutor.java
new file mode 100644
index 0000000..f195e0d
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SingleQueueRpcExecutor.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.collect.Lists;
+
+/**
+ * RPC Executor that uses a single queue for all the requests.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class SingleQueueRpcExecutor extends RpcExecutor {
+ private final BlockingQueue<CallRunner> queue;
+
+ public SingleQueueRpcExecutor(final String name, final int handlerCount,
+ final int maxQueueLength) {
+ this(name, handlerCount, LinkedBlockingQueue.class, maxQueueLength);
+ }
+
+ public SingleQueueRpcExecutor(final String name, final int handlerCount,
+ final Class<? extends BlockingQueue> queueClass, Object... initargs) {
+ super(name, handlerCount);
+ queue = (BlockingQueue<CallRunner>)ReflectionUtils.newInstance(queueClass, initargs);
+ }
+
+ @Override
+ public void dispatch(final CallRunner callTask) throws InterruptedException {
+ queue.put(callTask);
+ }
+
+ @Override
+ public int getQueueLength() {
+ return queue.size();
+ }
+
+ @Override
+ protected List<BlockingQueue<CallRunner>> getQueues() {
+ List<BlockingQueue<CallRunner>> list = new ArrayList<BlockingQueue<CallRunner>>(1);
+ list.add(queue);
+ return list;
+ }
+}