You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2014/07/01 10:35:29 UTC

[3/3] git commit: HBASE-11355 a couple of callQueue related improvements

HBASE-11355 a couple of callQueue related improvements


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9a6a59c7
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9a6a59c7
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9a6a59c7

Branch: refs/heads/branch-1
Commit: 9a6a59c7b7d8357c50fd32f01d0ca21911db3da2
Parents: 0167558
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Tue Jul 1 09:29:07 2014 +0200
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Tue Jul 1 10:33:39 2014 +0200

----------------------------------------------------------------------
 .../hadoop/hbase/util/ReflectionUtils.java      |  52 +++++-
 .../src/main/resources/hbase-default.xml        |  16 ++
 .../hbase/ipc/MultipleQueueRpcExecutor.java     |  87 ++++++++++
 .../hadoop/hbase/ipc/RWQueueRpcExecutor.java    | 163 +++++++++++++++++++
 .../apache/hadoop/hbase/ipc/RpcExecutor.java    | 128 +++++++++++++++
 .../hadoop/hbase/ipc/SimpleRpcScheduler.java    | 154 ++++++++----------
 .../hbase/ipc/SingleQueueRpcExecutor.java       |  71 ++++++++
 7 files changed, 582 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/9a6a59c7/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java
index cda5424..6629868 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hadoop.hbase.util;
 
+import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -26,13 +27,23 @@ import org.apache.hadoop.classification.InterfaceAudience;
 public class ReflectionUtils {
   @SuppressWarnings("unchecked")
   public static <T> T instantiateWithCustomCtor(String className,
-    Class<? >[] ctorArgTypes, Object[] ctorArgs) {
+      Class<? >[] ctorArgTypes, Object[] ctorArgs) {
     try {
       Class<? extends T> resultType = (Class<? extends T>) Class.forName(className);
-      return resultType.getDeclaredConstructor(ctorArgTypes).newInstance(ctorArgs);
+      Constructor<? extends T> ctor = resultType.getDeclaredConstructor(ctorArgTypes);
+      return instantiate(className, ctor, ctorArgs);
     } catch (ClassNotFoundException e) {
       throw new UnsupportedOperationException(
           "Unable to find " + className, e);
+    } catch (NoSuchMethodException e) {
+      throw new UnsupportedOperationException(
+          "Unable to find suitable constructor for class " + className, e);
+    }
+  }
+
+  private static <T> T instantiate(final String className, Constructor<T> ctor, Object[] ctorArgs) {
+    try {
+      return ctor.newInstance(ctorArgs);
     } catch (IllegalAccessException e) {
       throw new UnsupportedOperationException(
           "Unable to access specified class " + className, e);
@@ -42,9 +53,40 @@ public class ReflectionUtils {
     } catch (InvocationTargetException e) {
       throw new UnsupportedOperationException(
           "Constructor threw an exception for " + className, e);
-    } catch (NoSuchMethodException e) {
-      throw new UnsupportedOperationException(
-          "Unable to find suitable constructor for class " + className, e);
     }
   }
+
+  @SuppressWarnings("unchecked")
+  public static <T> T newInstance(Class<T> type, Object... params) {
+    return instantiate(type.getName(), findConstructor(type, params), params);
+  }
+
+  @SuppressWarnings("unchecked")
+  public static <T> Constructor<T> findConstructor(Class<T> type, Object... paramTypes) {
+    Constructor<T>[] constructors = (Constructor<T>[])type.getConstructors();
+    for (Constructor<T> ctor : constructors) {
+      Class<?>[] ctorParamTypes = ctor.getParameterTypes();
+      if (ctorParamTypes.length != paramTypes.length) {
+        continue;
+      }
+
+      boolean match = true;
+      for (int i = 0; i < ctorParamTypes.length && match; ++i) {
+        Class<?> paramType = paramTypes[i].getClass();
+        match = (!ctorParamTypes[i].isPrimitive()) ? ctorParamTypes[i].isAssignableFrom(paramType) :
+                  ((int.class.equals(ctorParamTypes[i]) && Integer.class.equals(paramType)) ||
+                   (long.class.equals(ctorParamTypes[i]) && Long.class.equals(paramType)) ||
+                   (char.class.equals(ctorParamTypes[i]) && Character.class.equals(paramType)) ||
+                   (short.class.equals(ctorParamTypes[i]) && Short.class.equals(paramType)) ||
+                   (boolean.class.equals(ctorParamTypes[i]) && Boolean.class.equals(paramType)) ||
+                   (byte.class.equals(ctorParamTypes[i]) && Byte.class.equals(paramType)));
+      }
+
+      if (match) {
+        return ctor;
+      }
+    }
+    throw new UnsupportedOperationException(
+      "Unable to find suitable constructor for class " + type.getName());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/9a6a59c7/hbase-common/src/main/resources/hbase-default.xml
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index 43723f8..70d20a7 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -181,6 +181,22 @@ possible configurations would overwhelm and obscure the important.
     Same property is used by the Master for count of master handlers.</description>
   </property>
   <property>
+    <name>ipc.server.callqueue.handler.factor</name>
+    <value>0.1</value>
+    <description>Factor to determine the number of call queues.
+      A value of 0 means a single queue shared between all the handlers.
+      A value of 1 means that each handler has its own queue.</description>
+  </property>
+  <property>
+    <name>ipc.server.callqueue.read.share</name>
+    <value>0</value>
+    <description>Split the call queues into read and write queues.
+      A value of 0 indicate to not split the call queues.
+      A value of 0.5 means there will be the same number of read and write queues
+      A value of 1.0 means that all the queues except one are used to dispatch read requests.
+    </description>
+  </property>
+  <property>
     <name>hbase.regionserver.msginterval</name>
     <value>3000</value>
     <description>Interval between messages from the RegionServer to Master

http://git-wip-us.apache.org/repos/asf/hbase/blob/9a6a59c7/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MultipleQueueRpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MultipleQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MultipleQueueRpcExecutor.java
new file mode 100644
index 0000000..ab14906
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MultipleQueueRpcExecutor.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
+
+import com.google.common.collect.Lists;
+
+/**
+ * RPC Executor that dispatch the requests on multiple queues.
+ * Each handler has its own queue and there is no stealing.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class MultipleQueueRpcExecutor extends RpcExecutor {
+  protected final List<BlockingQueue<CallRunner>> queues;
+  protected final Random balancer = new Random();
+
+  public MultipleQueueRpcExecutor(final String name, final int handlerCount,
+      final int numQueues, final int maxQueueLength) {
+    this(name, handlerCount, numQueues, LinkedBlockingQueue.class, maxQueueLength);
+  }
+
+  public MultipleQueueRpcExecutor(final String name, final int handlerCount,
+      final int numQueues,
+      final Class<? extends BlockingQueue> queueClass, Object... initargs) {
+    super(name, Math.max(handlerCount, numQueues));
+    queues = new ArrayList<BlockingQueue<CallRunner>>(numQueues);
+    initializeQueues(numQueues, queueClass, initargs);
+  }
+
+  protected void initializeQueues(final int numQueues,
+      final Class<? extends BlockingQueue> queueClass, Object... initargs) {
+    for (int i = 0; i < numQueues; ++i) {
+      queues.add((BlockingQueue<CallRunner>)
+        ReflectionUtils.newInstance(queueClass, initargs));
+    }
+  }
+
+  @Override
+  public void dispatch(final CallRunner callTask) throws InterruptedException {
+    int queueIndex = balancer.nextInt(queues.size());
+    queues.get(queueIndex).put(callTask);
+  }
+
+  @Override
+  public int getQueueLength() {
+    int length = 0;
+    for (final BlockingQueue<CallRunner> queue: queues) {
+      length += queue.size();
+    }
+    return length;
+  }
+
+  @Override
+  protected List<BlockingQueue<CallRunner>> getQueues() {
+    return queues;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/9a6a59c7/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
new file mode 100644
index 0000000..1eb1a22
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Action;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
+
+import com.google.common.collect.Lists;
+import com.google.protobuf.Message;
+
+/**
+ * RPC Executor that uses different queues for reads and writes.
+ * Each handler has its own queue and there is no stealing.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class RWQueueRpcExecutor extends RpcExecutor {
+  private static final Log LOG = LogFactory.getLog(RWQueueRpcExecutor.class);
+
+  private final List<BlockingQueue<CallRunner>> queues;
+  private final Random balancer = new Random();
+  private final int writeHandlersCount;
+  private final int readHandlersCount;
+  private final int numWriteQueues;
+  private final int numReadQueues;
+
+  public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
+      final float readShare, final int maxQueueLength) {
+    this(name, handlerCount, numQueues, readShare, maxQueueLength,
+      LinkedBlockingQueue.class);
+  }
+
+  public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
+      final float readShare, final int maxQueueLength,
+      final Class<? extends BlockingQueue> readQueueClass, Object... readQueueInitArgs) {
+    this(name, calcNumWriters(handlerCount, readShare), calcNumReaders(handlerCount, readShare),
+      calcNumWriters(numQueues, readShare), calcNumReaders(numQueues, readShare),
+      LinkedBlockingQueue.class, new Object[] {maxQueueLength},
+      readQueueClass, ArrayUtils.addAll(new Object[] {maxQueueLength}, readQueueInitArgs));
+  }
+
+  public RWQueueRpcExecutor(final String name, final int writeHandlers, final int readHandlers,
+      final int numWriteQueues, final int numReadQueues,
+      final Class<? extends BlockingQueue> writeQueueClass, Object[] writeQueueInitArgs,
+      final Class<? extends BlockingQueue> readQueueClass, Object[] readQueueInitArgs) {
+    super(name, Math.max(writeHandlers + readHandlers, numWriteQueues + numReadQueues));
+
+    this.writeHandlersCount = Math.max(writeHandlers, numWriteQueues);
+    this.readHandlersCount = Math.max(readHandlers, numReadQueues);
+    this.numWriteQueues = numWriteQueues;
+    this.numReadQueues = numReadQueues;
+
+    queues = new ArrayList<BlockingQueue<CallRunner>>(writeHandlersCount + readHandlersCount);
+    LOG.debug(name + " writeQueues=" + numWriteQueues + " writeHandlers=" + writeHandlersCount +
+              " readQueues=" + numReadQueues + " readHandlers=" + readHandlersCount);
+
+    for (int i = 0; i < numWriteQueues; ++i) {
+      queues.add((BlockingQueue<CallRunner>)
+        ReflectionUtils.newInstance(writeQueueClass, writeQueueInitArgs));
+    }
+
+    for (int i = 0; i < numReadQueues; ++i) {
+      queues.add((BlockingQueue<CallRunner>)
+        ReflectionUtils.newInstance(readQueueClass, readQueueInitArgs));
+    }
+  }
+
+  @Override
+  protected void startHandlers(final int port) {
+    startHandlers(".write", writeHandlersCount, queues, 0, numWriteQueues, port);
+    startHandlers(".read", readHandlersCount, queues, numWriteQueues, numReadQueues, port);
+  }
+
+  @Override
+  public void dispatch(final CallRunner callTask) throws InterruptedException {
+    RpcServer.Call call = callTask.getCall();
+    int queueIndex;
+    if (isWriteRequest(call.getHeader(), call.param)) {
+      queueIndex = balancer.nextInt(numWriteQueues);
+    } else {
+      queueIndex = numWriteQueues + balancer.nextInt(numReadQueues);
+    }
+    queues.get(queueIndex).put(callTask);
+  }
+
+  private boolean isWriteRequest(final RequestHeader header, final Message param) {
+    // TODO: Is there a better way to do this?
+    String methodName = header.getMethodName();
+    if (methodName.equalsIgnoreCase("multi") && param instanceof MultiRequest) {
+      MultiRequest multi = (MultiRequest)param;
+      for (RegionAction regionAction : multi.getRegionActionList()) {
+        for (Action action: regionAction.getActionList()) {
+          if (action.hasMutation()) {
+            return true;
+          }
+        }
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public int getQueueLength() {
+    int length = 0;
+    for (final BlockingQueue<CallRunner> queue: queues) {
+      length += queue.size();
+    }
+    return length;
+  }
+
+  @Override
+  protected List<BlockingQueue<CallRunner>> getQueues() {
+    return queues;
+  }
+
+  /*
+   * Calculate the number of writers based on the "total count" and the read share.
+   * You'll get at least one writer.
+   */
+  private static int calcNumWriters(final int count, final float readShare) {
+    return Math.max(1, count - Math.max(1, (int)Math.round(count * readShare)));
+  }
+
+  /*
+   * Calculate the number of readers based on the "total count" and the read share.
+   * You'll get at least one reader.
+   */
+  private static int calcNumReaders(final int count, final float readShare) {
+    return count - calcNumWriters(count, readShare);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/9a6a59c7/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
new file mode 100644
index 0000000..84a71ea
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class RpcExecutor {
+  private static final Log LOG = LogFactory.getLog(RpcExecutor.class);
+
+  private final AtomicInteger activeHandlerCount = new AtomicInteger(0);
+  private final List<Thread> handlers;
+  private final int handlerCount;
+  private final String name;
+
+  private boolean running;
+
+  public RpcExecutor(final String name, final int handlerCount) {
+    this.handlers = new ArrayList<Thread>(handlerCount);
+    this.handlerCount = handlerCount;
+    this.name = Strings.nullToEmpty(name);
+  }
+
+  public void start(final int port) {
+    running = true;
+    startHandlers(port);
+  }
+
+  public void stop() {
+    running = false;
+    for (Thread handler : handlers) {
+      handler.interrupt();
+    }
+  }
+
+  public int getActiveHandlerCount() {
+    return activeHandlerCount.get();
+  }
+
+  /** Returns the length of the pending queue */
+  public abstract int getQueueLength();
+
+  /** Add the request to the executor queue */
+  public abstract void dispatch(final CallRunner callTask) throws InterruptedException;
+
+  /** Returns the list of request queues */
+  protected abstract List<BlockingQueue<CallRunner>> getQueues();
+
+  protected void startHandlers(final int port) {
+    List<BlockingQueue<CallRunner>> callQueues = getQueues();
+    startHandlers(null, handlerCount, callQueues, 0, callQueues.size(), port);
+  }
+
+  protected void startHandlers(final String nameSuffix, final int numHandlers,
+      final List<BlockingQueue<CallRunner>> callQueues,
+      final int qindex, final int qsize, final int port) {
+    final String threadPrefix = name + Strings.nullToEmpty(nameSuffix);
+    for (int i = 0; i < numHandlers; i++) {
+      final int index = qindex + (i % qsize);
+      Thread t = new Thread(new Runnable() {
+        @Override
+        public void run() {
+          consumerLoop(callQueues.get(index));
+        }
+      });
+      t.setDaemon(true);
+      t.setName(threadPrefix + "RpcServer.handler=" + handlers.size() +
+          ",queue=" + index + ",port=" + port);
+      t.start();
+      LOG.debug(threadPrefix + " Start Handler index=" + handlers.size() + " queue=" + index);
+      handlers.add(t);
+    }
+  }
+
+  protected void consumerLoop(final BlockingQueue<CallRunner> myQueue) {
+    boolean interrupted = false;
+    try {
+      while (running) {
+        try {
+          CallRunner task = myQueue.take();
+          try {
+            activeHandlerCount.incrementAndGet();
+            task.run();
+          } finally {
+            activeHandlerCount.decrementAndGet();
+          }
+        } catch (InterruptedException e) {
+          interrupted = true;
+        }
+      }
+    } finally {
+      if (interrupted) {
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/9a6a59c7/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
index 764900b..27f5427 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
@@ -17,7 +17,11 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
+import java.io.IOException;
+
+import java.util.Random;
 import java.util.Comparator;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -43,7 +47,11 @@ import com.google.common.collect.Lists;
 public class SimpleRpcScheduler implements RpcScheduler {
   public static final Log LOG = LogFactory.getLog(SimpleRpcScheduler.class);
 
-  /** If set to true, uses a priority queue and deprioritize long-running scans */
+  public static final String CALL_QUEUE_READ_SHARE_CONF_KEY = "ipc.server.callqueue.read.share";
+  public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY =
+      "ipc.server.callqueue.handler.factor";
+
+  /** If set to 'deadline', uses a priority queue and deprioritize long-running scans */
   public static final String CALL_QUEUE_TYPE_CONF_KEY = "ipc.server.callqueue.type";
   public static final String CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE = "deadline";
   public static final String CALL_QUEUE_TYPE_FIFO_CONF_VALUE = "fifo";
@@ -82,16 +90,11 @@ public class SimpleRpcScheduler implements RpcScheduler {
   }
 
   private int port;
-  private final int handlerCount;
-  private final int priorityHandlerCount;
-  private final int replicationHandlerCount;
   private final PriorityFunction priority;
-  final BlockingQueue<CallRunner> callQueue;
-  final BlockingQueue<CallRunner> priorityCallQueue;
-  final BlockingQueue<CallRunner> replicationQueue;
-  private volatile boolean running = false;
-  private final List<Thread> handlers = Lists.newArrayList();
-  private AtomicInteger activeHandlerCount = new AtomicInteger(0);
+  private final RpcExecutor callExecutor;
+  private final RpcExecutor priorityExecutor;
+  private final RpcExecutor replicationExecutor;
+
   /** What level a high priority call is at. */
   private final int highPriorityLevel;
 
@@ -112,25 +115,53 @@ public class SimpleRpcScheduler implements RpcScheduler {
       int highPriorityLevel) {
     int maxQueueLength = conf.getInt("ipc.server.max.callqueue.length",
         handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
-    this.handlerCount = handlerCount;
-    this.priorityHandlerCount = priorityHandlerCount;
-    this.replicationHandlerCount = replicationHandlerCount;
     this.priority = priority;
     this.highPriorityLevel = highPriorityLevel;
 
     String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
-    LOG.debug("Using " + callQueueType + " as user call queue");
-    if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
-      this.callQueue = new BoundedPriorityBlockingQueue<CallRunner>(maxQueueLength,
-          new CallPriorityComparator(conf, this.priority));
+    float callqReadShare = conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
+
+    float callQueuesHandlersFactor = conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0);
+    int numCallQueues = Math.max(1, (int)Math.round(handlerCount * callQueuesHandlersFactor));
+
+    LOG.info("Using " + callQueueType + " as user call queue, count=" + numCallQueues);
+
+    if (numCallQueues > 1 && callqReadShare > 0) {
+      // multiple read/write queues
+      if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
+        CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
+        callExecutor = new RWQueueRpcExecutor("default", handlerCount, numCallQueues,
+            callqReadShare, maxQueueLength, BoundedPriorityBlockingQueue.class, callPriority);
+      } else {
+        callExecutor = new RWQueueRpcExecutor("default", handlerCount, numCallQueues,
+            callqReadShare, maxQueueLength);
+      }
+    } else if (numCallQueues > 1) {
+      // multiple queues
+      if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
+        CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
+        callExecutor = new MultipleQueueRpcExecutor("default", handlerCount, numCallQueues,
+            BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority);
+      } else {
+        callExecutor = new MultipleQueueRpcExecutor("default", handlerCount,
+            numCallQueues, maxQueueLength);
+      }
     } else {
-      this.callQueue = new LinkedBlockingQueue<CallRunner>(maxQueueLength);
+      // Single queue
+      if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
+        CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
+        callExecutor = new SingleQueueRpcExecutor("default", handlerCount,
+            BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority);
+      } else {
+        callExecutor = new SingleQueueRpcExecutor("default", handlerCount, maxQueueLength);
+      }
     }
-    this.priorityCallQueue = priorityHandlerCount > 0
-        ? new LinkedBlockingQueue<CallRunner>(maxQueueLength)
+
+    this.priorityExecutor = priorityHandlerCount > 0
+        ? new SingleQueueRpcExecutor("Priority", priorityHandlerCount, maxQueueLength)
         : null;
-    this.replicationQueue = replicationHandlerCount > 0
-        ? new LinkedBlockingQueue<CallRunner>(maxQueueLength)
+    this.replicationExecutor = replicationHandlerCount > 0
+        ? new SingleQueueRpcExecutor("Replication", replicationHandlerCount, maxQueueLength)
         : null;
   }
 
@@ -141,96 +172,51 @@ public class SimpleRpcScheduler implements RpcScheduler {
 
   @Override
   public void start() {
-    running = true;
-    startHandlers(handlerCount, callQueue, null);
-    if (priorityCallQueue != null) {
-      startHandlers(priorityHandlerCount, priorityCallQueue, "Priority.");
-    }
-    if (replicationQueue != null) {
-      startHandlers(replicationHandlerCount, replicationQueue, "Replication.");
-    }
-  }
-
-  private void startHandlers(
-      int handlerCount,
-      final BlockingQueue<CallRunner> callQueue,
-      String threadNamePrefix) {
-    for (int i = 0; i < handlerCount; i++) {
-      Thread t = new Thread(new Runnable() {
-        @Override
-        public void run() {
-          consumerLoop(callQueue);
-        }
-      });
-      t.setDaemon(true);
-      t.setName(Strings.nullToEmpty(threadNamePrefix) + "RpcServer.handler=" + i + ",port=" + port);
-      t.start();
-      handlers.add(t);
-    }
+    callExecutor.start(port);
+    if (priorityExecutor != null) priorityExecutor.start(port);
+    if (replicationExecutor != null) replicationExecutor.start(port);
   }
 
   @Override
   public void stop() {
-    running = false;
-    for (Thread handler : handlers) {
-      handler.interrupt();
-    }
+    callExecutor.stop();
+    if (priorityExecutor != null) priorityExecutor.stop();
+    if (replicationExecutor != null) replicationExecutor.stop();
   }
 
   @Override
   public void dispatch(CallRunner callTask) throws InterruptedException {
     RpcServer.Call call = callTask.getCall();
     int level = priority.getPriority(call.getHeader(), call.param);
-    if (priorityCallQueue != null && level > highPriorityLevel) {
-      priorityCallQueue.put(callTask);
-    } else if (replicationQueue != null && level == HConstants.REPLICATION_QOS) {
-      replicationQueue.put(callTask);
+    if (priorityExecutor != null && level > highPriorityLevel) {
+      priorityExecutor.dispatch(callTask);
+    } else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) {
+      replicationExecutor.dispatch(callTask);
     } else {
-      callQueue.put(callTask); // queue the call; maybe blocked here
+      callExecutor.dispatch(callTask);
     }
   }
 
   @Override
   public int getGeneralQueueLength() {
-    return callQueue.size();
+    return callExecutor.getQueueLength();
   }
 
   @Override
   public int getPriorityQueueLength() {
-    return priorityCallQueue == null ? 0 : priorityCallQueue.size();
+    return priorityExecutor == null ? 0 : priorityExecutor.getQueueLength();
   }
 
   @Override
   public int getReplicationQueueLength() {
-    return replicationQueue == null ? 0 : replicationQueue.size();
+    return replicationExecutor == null ? 0 : replicationExecutor.getQueueLength();
   }
 
   @Override
   public int getActiveRpcHandlerCount() {
-    return activeHandlerCount.get();
-  }
-
-  private void consumerLoop(BlockingQueue<CallRunner> myQueue) {
-    boolean interrupted = false;
-    try {
-      while (running) {
-        try {
-          CallRunner task = myQueue.take();
-          try {
-            activeHandlerCount.incrementAndGet();
-            task.run();
-          } finally {
-            activeHandlerCount.decrementAndGet();
-          }
-        } catch (InterruptedException e) {
-          interrupted = true;
-        }
-      }
-    } finally {
-      if (interrupted) {
-        Thread.currentThread().interrupt();
-      }
-    }
+    return callExecutor.getActiveHandlerCount() +
+           (priorityExecutor == null ? 0 : priorityExecutor.getActiveHandlerCount()) +
+           (replicationExecutor == null ? 0 : replicationExecutor.getActiveHandlerCount());
   }
 }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/9a6a59c7/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SingleQueueRpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SingleQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SingleQueueRpcExecutor.java
new file mode 100644
index 0000000..f195e0d
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SingleQueueRpcExecutor.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.collect.Lists;
+
+/**
+ * RPC Executor that uses a single queue for all the requests.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class SingleQueueRpcExecutor extends RpcExecutor {
+  private final BlockingQueue<CallRunner> queue;
+
+  public SingleQueueRpcExecutor(final String name, final int handlerCount,
+      final int maxQueueLength) {
+    this(name, handlerCount, LinkedBlockingQueue.class, maxQueueLength);
+  }
+
+  public SingleQueueRpcExecutor(final String name, final int handlerCount,
+      final Class<? extends BlockingQueue> queueClass, Object... initargs) {
+    super(name, handlerCount);
+    queue = (BlockingQueue<CallRunner>)ReflectionUtils.newInstance(queueClass, initargs);
+  }
+
+  @Override
+  public void dispatch(final CallRunner callTask) throws InterruptedException {
+    queue.put(callTask);
+  }
+
+  @Override
+  public int getQueueLength() {
+    return queue.size();
+  }
+
+  @Override
+  protected List<BlockingQueue<CallRunner>> getQueues() {
+    List<BlockingQueue<CallRunner>> list = new ArrayList<BlockingQueue<CallRunner>>(1);
+    list.add(queue);
+    return list;
+  }
+}