You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2014/08/14 21:54:26 UTC

[1/2] git commit: HBASE-11724 Add to RWQueueRpcExecutor the ability to split get and scan handlers

Repository: hbase
Updated Branches:
  refs/heads/branch-1 fc5f0a672 -> 7df6f5800
  refs/heads/master 865fae8a8 -> e1e70a7e2


HBASE-11724 Add to RWQueueRpcExecutor the ability to split get and scan handlers


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e1e70a7e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e1e70a7e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e1e70a7e

Branch: refs/heads/master
Commit: e1e70a7e2f74c45e6b58b0ccace21969a7a29358
Parents: 865fae8
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Thu Aug 14 20:52:11 2014 +0100
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Thu Aug 14 20:52:11 2014 +0100

----------------------------------------------------------------------
 .../src/main/resources/hbase-default.xml        |  45 ++++++-
 .../hadoop/hbase/ipc/RWQueueRpcExecutor.java    |  65 +++++++++-
 .../hadoop/hbase/ipc/SimpleRpcScheduler.java    |  11 +-
 .../hbase/ipc/TestSimpleRpcScheduler.java       | 118 +++++++++++++------
 4 files changed, 194 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e1e70a7e/hbase-common/src/main/resources/hbase-default.xml
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index 7b71896..aeb78a5 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -187,13 +187,50 @@ possible configurations would overwhelm and obscure the important.
       A value of 0 means a single queue shared between all the handlers.
       A value of 1 means that each handler has its own queue.</description>
   </property>
-  <property>
-    <name>hbase.ipc.server.callqueue.read.share</name>
+<property>
+    <name>hbase.ipc.server.callqueue.read.ratio</name>
     <value>0</value>
     <description>Split the call queues into read and write queues.
-      A value of 0 indicate to not split the call queues.
-      A value of 0.5 means there will be the same number of read and write queues
+      The specified interval (which should be between 0.0 and 1.0)
+      will be multiplied by the number of call queues.
+      A value of 0 indicate to not split the call queues, meaning that both read and write
+      requests will be pushed to the same set of queues.
+      A value lower than 0.5 means that there will be less read queues than write queues.
+      A value of 0.5 means there will be the same number of read and write queues.
+      A value greater than 0.5 means that there will be more read queues than write queues.
       A value of 1.0 means that all the queues except one are used to dispatch read requests.
+
+      Example: Given the total number of call queues being 10
+      a read.ratio of 0 means that: the 10 queues will contain both read/write requests.
+      a read.ratio of 0.3 means that: 3 queues will contain only read requests
+      and 7 queues will contain only write requests.
+      a read.ratio of 0.5 means that: 5 queues will contain only read requests
+      and 5 queues will contain only write requests.
+      a read.ratio of 0.8 means that: 8 queues will contain only read requests
+      and 2 queues will contain only write requests.
+      a read.ratio of 1 means that: 9 queues will contain only read requests
+      and 1 queues will contain only write requests.
+    </description>
+  </property>
+  <property>
+    <name>hbase.ipc.server.callqueue.scan.ratio</name>
+    <value>0</value>
+    <description>Given the number of read call queues, calculated from the total number
+      of call queues multiplied by the callqueue.read.ratio, the scan.ratio property
+      will split the read call queues into small-read and long-read queues.
+      A value lower than 0.5 means that there will be less long-read queues than short-read queues.
+      A value of 0.5 means that there will be the same number of short-read and long-read queues.
+      A value greater than 0.5 means that there will be more long-read queues than short-read queues
+      A value of 0 or 1 indicate to use the same set of queues for gets and scans.
+
+      Example: Given the total number of read call queues being 8
+      a scan.ratio of 0 or 1 means that: 8 queues will contain both long and short read requests.
+      a scan.ratio of 0.3 means that: 2 queues will contain only long-read requests
+      and 6 queues will contain only short-read requests.
+      a scan.ratio of 0.5 means that: 4 queues will contain only long-read requests
+      and 4 queues will contain only short-read requests.
+      a scan.ratio of 0.8 means that: 6 queues will contain only long-read requests
+      and 2 queues will contain only short-read requests.
     </description>
   </property>
   <property>

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1e70a7e/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
index 247b7da..602c53e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Action;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
 
@@ -40,6 +41,7 @@ import com.google.protobuf.Message;
 
 /**
  * RPC Executor that uses different queues for reads and writes.
+ * With the options to use different queues/executors for gets and scans.
  * Each handler has its own queue and there is no stealing.
  */
 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
@@ -51,20 +53,34 @@ public class RWQueueRpcExecutor extends RpcExecutor {
   private final Random balancer = new Random();
   private final int writeHandlersCount;
   private final int readHandlersCount;
+  private final int scanHandlersCount;
   private final int numWriteQueues;
   private final int numReadQueues;
+  private final int numScanQueues;
 
   public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
       final float readShare, final int maxQueueLength) {
-    this(name, handlerCount, numQueues, readShare, maxQueueLength,
+    this(name, handlerCount, numQueues, readShare, maxQueueLength, 0, LinkedBlockingQueue.class);
+  }
+
+  public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
+      final float readShare, final float scanShare, final int maxQueueLength) {
+    this(name, handlerCount, numQueues, readShare, scanShare, maxQueueLength,
       LinkedBlockingQueue.class);
   }
 
   public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
       final float readShare, final int maxQueueLength,
       final Class<? extends BlockingQueue> readQueueClass, Object... readQueueInitArgs) {
+    this(name, handlerCount, numQueues, readShare, 0, maxQueueLength,
+      readQueueClass, readQueueInitArgs);
+  }
+
+  public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
+      final float readShare, final float scanShare, final int maxQueueLength,
+      final Class<? extends BlockingQueue> readQueueClass, Object... readQueueInitArgs) {
     this(name, calcNumWriters(handlerCount, readShare), calcNumReaders(handlerCount, readShare),
-      calcNumWriters(numQueues, readShare), calcNumReaders(numQueues, readShare),
+      calcNumWriters(numQueues, readShare), calcNumReaders(numQueues, readShare), scanShare,
       LinkedBlockingQueue.class, new Object[] {maxQueueLength},
       readQueueClass, ArrayUtils.addAll(new Object[] {maxQueueLength}, readQueueInitArgs));
   }
@@ -73,23 +89,45 @@ public class RWQueueRpcExecutor extends RpcExecutor {
       final int numWriteQueues, final int numReadQueues,
       final Class<? extends BlockingQueue> writeQueueClass, Object[] writeQueueInitArgs,
       final Class<? extends BlockingQueue> readQueueClass, Object[] readQueueInitArgs) {
-    super(name, Math.max(writeHandlers + readHandlers, numWriteQueues + numReadQueues));
+    this(name, writeHandlers, readHandlers, numWriteQueues, numReadQueues, 0,
+      writeQueueClass, writeQueueInitArgs, readQueueClass, readQueueInitArgs);
+  }
+
+  public RWQueueRpcExecutor(final String name, int writeHandlers, int readHandlers,
+      int numWriteQueues, int numReadQueues, float scanShare,
+      final Class<? extends BlockingQueue> writeQueueClass, Object[] writeQueueInitArgs,
+      final Class<? extends BlockingQueue> readQueueClass, Object[] readQueueInitArgs) {
+    super(name, Math.max(writeHandlers, numWriteQueues) + Math.max(readHandlers, numReadQueues));
+
+    int numScanQueues = Math.max(0, (int)Math.floor(numReadQueues * scanShare));
+    int scanHandlers = Math.max(0, (int)Math.floor(readHandlers * scanShare));
+    if ((numReadQueues - numScanQueues) > 0) {
+      numReadQueues -= numScanQueues;
+      readHandlers -= scanHandlers;
+    } else {
+      numScanQueues = 0;
+      scanHandlers = 0;
+    }
 
     this.writeHandlersCount = Math.max(writeHandlers, numWriteQueues);
     this.readHandlersCount = Math.max(readHandlers, numReadQueues);
+    this.scanHandlersCount = Math.max(scanHandlers, numScanQueues);
     this.numWriteQueues = numWriteQueues;
     this.numReadQueues = numReadQueues;
+    this.numScanQueues = numScanQueues;
 
     queues = new ArrayList<BlockingQueue<CallRunner>>(writeHandlersCount + readHandlersCount);
     LOG.debug(name + " writeQueues=" + numWriteQueues + " writeHandlers=" + writeHandlersCount +
-              " readQueues=" + numReadQueues + " readHandlers=" + readHandlersCount);
+              " readQueues=" + numReadQueues + " readHandlers=" + readHandlersCount +
+              ((numScanQueues == 0) ? "" : " scanQueues=" + numScanQueues +
+                " scanHandlers=" + scanHandlersCount));
 
     for (int i = 0; i < numWriteQueues; ++i) {
       queues.add((BlockingQueue<CallRunner>)
         ReflectionUtils.newInstance(writeQueueClass, writeQueueInitArgs));
     }
 
-    for (int i = 0; i < numReadQueues; ++i) {
+    for (int i = 0; i < (numReadQueues + numScanQueues); ++i) {
       queues.add((BlockingQueue<CallRunner>)
         ReflectionUtils.newInstance(readQueueClass, readQueueInitArgs));
     }
@@ -99,6 +137,8 @@ public class RWQueueRpcExecutor extends RpcExecutor {
   protected void startHandlers(final int port) {
     startHandlers(".write", writeHandlersCount, queues, 0, numWriteQueues, port);
     startHandlers(".read", readHandlersCount, queues, numWriteQueues, numReadQueues, port);
+    startHandlers(".scan", scanHandlersCount, queues,
+                  numWriteQueues + numReadQueues, numScanQueues, port);
   }
 
   @Override
@@ -107,6 +147,8 @@ public class RWQueueRpcExecutor extends RpcExecutor {
     int queueIndex;
     if (isWriteRequest(call.getHeader(), call.param)) {
       queueIndex = balancer.nextInt(numWriteQueues);
+    } else if (numScanQueues > 0 && isScanRequest(call.getHeader(), call.param)) {
+      queueIndex = numWriteQueues + numReadQueues + balancer.nextInt(numScanQueues);
     } else {
       queueIndex = numWriteQueues + balancer.nextInt(numReadQueues);
     }
@@ -126,6 +168,19 @@ public class RWQueueRpcExecutor extends RpcExecutor {
         }
       }
     }
+    if (methodName.equalsIgnoreCase("mutate")) {
+      return true;
+    }
+    return false;
+  }
+
+  private boolean isScanRequest(final RequestHeader header, final Message param) {
+    String methodName = header.getMethodName();
+    if (methodName.equalsIgnoreCase("scan")) {
+      // The first scan request will be executed as a "short read"
+      ScanRequest request = (ScanRequest)param;
+      return request.hasScannerId();
+    }
     return false;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1e70a7e/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
index 0458c00..2debe2e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
@@ -38,7 +38,10 @@ import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
 public class SimpleRpcScheduler extends RpcScheduler {
   public static final Log LOG = LogFactory.getLog(SimpleRpcScheduler.class);
 
-  public static final String CALL_QUEUE_READ_SHARE_CONF_KEY = "hbase.ipc.server.callqueue.read.share";
+  public static final String CALL_QUEUE_READ_SHARE_CONF_KEY =
+      "hbase.ipc.server.callqueue.read.ratio";
+  public static final String CALL_QUEUE_SCAN_SHARE_CONF_KEY =
+      "hbase.ipc.server.callqueue.scan.ratio";
   public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY =
       "hbase.ipc.server.callqueue.handler.factor";
 
@@ -112,6 +115,7 @@ public class SimpleRpcScheduler extends RpcScheduler {
 
     String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
     float callqReadShare = conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
+    float callqScanShare = conf.getFloat(CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0);
 
     float callQueuesHandlersFactor = conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0);
     int numCallQueues = Math.max(1, (int)Math.round(handlerCount * callQueuesHandlersFactor));
@@ -123,10 +127,11 @@ public class SimpleRpcScheduler extends RpcScheduler {
       if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
         CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
         callExecutor = new RWQueueRpcExecutor("default", handlerCount, numCallQueues,
-            callqReadShare, maxQueueLength, BoundedPriorityBlockingQueue.class, callPriority);
+            callqReadShare, callqScanShare, maxQueueLength,
+            BoundedPriorityBlockingQueue.class, callPriority);
       } else {
         callExecutor = new RWQueueRpcExecutor("default", handlerCount, numCallQueues,
-            callqReadShare, maxQueueLength);
+            callqReadShare, callqScanShare, maxQueueLength);
       }
     } else {
       // multiple queues

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1e70a7e/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
index d4a79b6..9d51915 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.SmallTests;
 import org.apache.hadoop.hbase.ipc.RpcServer.Call;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
 import org.apache.hadoop.hbase.util.Threads;
 import org.junit.Before;
 import org.junit.Test;
@@ -46,6 +47,7 @@ import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.eq;
@@ -185,39 +187,9 @@ public class TestSimpleRpcScheduler {
       when(priority.getDeadline(eq(hugeHead), any(Message.class))).thenReturn(100L);
 
       final ArrayList<Integer> work = new ArrayList<Integer>();
-
-      doAnswer(new Answer<Object>() {
-        @Override
-        public Object answer(InvocationOnMock invocation) {
-          synchronized (work) {
-            work.add(10);
-          }
-          Threads.sleepWithoutInterrupt(100);
-          return null;
-        }
-      }).when(smallCallTask).run();
-
-      doAnswer(new Answer<Object>() {
-        @Override
-        public Object answer(InvocationOnMock invocation) {
-          synchronized (work) {
-            work.add(50);
-          }
-          Threads.sleepWithoutInterrupt(100);
-          return null;
-        }
-      }).when(largeCallTask).run();
-
-      doAnswer(new Answer<Object>() {
-        @Override
-        public Object answer(InvocationOnMock invocation) {
-          synchronized (work) {
-            work.add(100);
-          }
-          Threads.sleepWithoutInterrupt(100);
-          return null;
-        }
-      }).when(hugeCallTask).run();
+      doAnswerTaskExecution(smallCallTask, work, 10, 250);
+      doAnswerTaskExecution(largeCallTask, work, 50, 250);
+      doAnswerTaskExecution(hugeCallTask, work, 100, 250);
 
       scheduler.dispatch(smallCallTask);
       scheduler.dispatch(smallCallTask);
@@ -253,4 +225,84 @@ public class TestSimpleRpcScheduler {
       scheduler.stop();
     }
   }
+
+  @Test
+  public void testScanQueues() throws Exception {
+    Configuration schedConf = HBaseConfiguration.create();
+    schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
+    schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f);
+    schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f);
+
+    PriorityFunction priority = mock(PriorityFunction.class);
+    when(priority.getPriority(any(RequestHeader.class), any(Message.class)))
+      .thenReturn(HConstants.NORMAL_QOS);
+
+    RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 3, 1, 1, priority,
+                                                    HConstants.QOS_THRESHOLD);
+    try {
+      scheduler.start();
+
+      CallRunner putCallTask = mock(CallRunner.class);
+      RpcServer.Call putCall = mock(RpcServer.Call.class);
+      RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
+      when(putCallTask.getCall()).thenReturn(putCall);
+      when(putCall.getHeader()).thenReturn(putHead);
+
+      CallRunner getCallTask = mock(CallRunner.class);
+      RpcServer.Call getCall = mock(RpcServer.Call.class);
+      RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build();
+      when(getCallTask.getCall()).thenReturn(getCall);
+      when(getCall.getHeader()).thenReturn(getHead);
+
+      CallRunner scanCallTask = mock(CallRunner.class);
+      RpcServer.Call scanCall = mock(RpcServer.Call.class);
+      scanCall.param = ScanRequest.newBuilder().setScannerId(1).build();
+      RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build();
+      when(scanCallTask.getCall()).thenReturn(scanCall);
+      when(scanCall.getHeader()).thenReturn(scanHead);
+
+      ArrayList<Integer> work = new ArrayList<Integer>();
+      doAnswerTaskExecution(putCallTask, work, 1, 1000);
+      doAnswerTaskExecution(getCallTask, work, 2, 1000);
+      doAnswerTaskExecution(scanCallTask, work, 3, 1000);
+
+      // There are 3 queues: [puts], [gets], [scans]
+      // so the calls will be interleaved
+      scheduler.dispatch(putCallTask);
+      scheduler.dispatch(putCallTask);
+      scheduler.dispatch(putCallTask);
+      scheduler.dispatch(getCallTask);
+      scheduler.dispatch(getCallTask);
+      scheduler.dispatch(getCallTask);
+      scheduler.dispatch(scanCallTask);
+      scheduler.dispatch(scanCallTask);
+      scheduler.dispatch(scanCallTask);
+
+      while (work.size() < 6) {
+        Threads.sleepWithoutInterrupt(100);
+      }
+
+      for (int i = 0; i < work.size() - 2; i += 3) {
+        assertNotEquals(work.get(i + 0), work.get(i + 1));
+        assertNotEquals(work.get(i + 0), work.get(i + 2));
+        assertNotEquals(work.get(i + 1), work.get(i + 2));
+      }
+    } finally {
+      scheduler.stop();
+    }
+  }
+
+  private void doAnswerTaskExecution(final CallRunner callTask,
+      final ArrayList<Integer> results, final int value, final int sleepInterval) {
+    doAnswer(new Answer<Object>() {
+      @Override
+      public Object answer(InvocationOnMock invocation) {
+        synchronized (results) {
+          results.add(value);
+        }
+        Threads.sleepWithoutInterrupt(sleepInterval);
+        return null;
+      }
+    }).when(callTask).run();
+  }
 }


[2/2] git commit: HBASE-11724 Add to RWQueueRpcExecutor the ability to split get and scan handlers

Posted by mb...@apache.org.
HBASE-11724 Add to RWQueueRpcExecutor the ability to split get and scan handlers


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7df6f580
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7df6f580
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7df6f580

Branch: refs/heads/branch-1
Commit: 7df6f580010e1b934ba0cc2b14395185eff8948e
Parents: fc5f0a6
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Thu Aug 14 20:52:11 2014 +0100
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Thu Aug 14 20:52:40 2014 +0100

----------------------------------------------------------------------
 .../src/main/resources/hbase-default.xml        |  45 ++++++-
 .../hadoop/hbase/ipc/RWQueueRpcExecutor.java    |  65 +++++++++-
 .../hadoop/hbase/ipc/SimpleRpcScheduler.java    |  11 +-
 .../hbase/ipc/TestSimpleRpcScheduler.java       | 118 +++++++++++++------
 4 files changed, 194 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/7df6f580/hbase-common/src/main/resources/hbase-default.xml
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index b58b6ff..d52233a 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -187,13 +187,50 @@ possible configurations would overwhelm and obscure the important.
       A value of 0 means a single queue shared between all the handlers.
       A value of 1 means that each handler has its own queue.</description>
   </property>
-  <property>
-    <name>hbase.ipc.server.callqueue.read.share</name>
+<property>
+    <name>hbase.ipc.server.callqueue.read.ratio</name>
     <value>0</value>
     <description>Split the call queues into read and write queues.
-      A value of 0 indicate to not split the call queues.
-      A value of 0.5 means there will be the same number of read and write queues
+      The specified interval (which should be between 0.0 and 1.0)
+      will be multiplied by the number of call queues.
+      A value of 0 indicate to not split the call queues, meaning that both read and write
+      requests will be pushed to the same set of queues.
+      A value lower than 0.5 means that there will be less read queues than write queues.
+      A value of 0.5 means there will be the same number of read and write queues.
+      A value greater than 0.5 means that there will be more read queues than write queues.
       A value of 1.0 means that all the queues except one are used to dispatch read requests.
+
+      Example: Given the total number of call queues being 10
+      a read.ratio of 0 means that: the 10 queues will contain both read/write requests.
+      a read.ratio of 0.3 means that: 3 queues will contain only read requests
+      and 7 queues will contain only write requests.
+      a read.ratio of 0.5 means that: 5 queues will contain only read requests
+      and 5 queues will contain only write requests.
+      a read.ratio of 0.8 means that: 8 queues will contain only read requests
+      and 2 queues will contain only write requests.
+      a read.ratio of 1 means that: 9 queues will contain only read requests
+      and 1 queues will contain only write requests.
+    </description>
+  </property>
+  <property>
+    <name>hbase.ipc.server.callqueue.scan.ratio</name>
+    <value>0</value>
+    <description>Given the number of read call queues, calculated from the total number
+      of call queues multiplied by the callqueue.read.ratio, the scan.ratio property
+      will split the read call queues into small-read and long-read queues.
+      A value lower than 0.5 means that there will be less long-read queues than short-read queues.
+      A value of 0.5 means that there will be the same number of short-read and long-read queues.
+      A value greater than 0.5 means that there will be more long-read queues than short-read queues
+      A value of 0 or 1 indicate to use the same set of queues for gets and scans.
+
+      Example: Given the total number of read call queues being 8
+      a scan.ratio of 0 or 1 means that: 8 queues will contain both long and short read requests.
+      a scan.ratio of 0.3 means that: 2 queues will contain only long-read requests
+      and 6 queues will contain only short-read requests.
+      a scan.ratio of 0.5 means that: 4 queues will contain only long-read requests
+      and 4 queues will contain only short-read requests.
+      a scan.ratio of 0.8 means that: 6 queues will contain only long-read requests
+      and 2 queues will contain only short-read requests.
     </description>
   </property>
   <property>

http://git-wip-us.apache.org/repos/asf/hbase/blob/7df6f580/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
index 247b7da..602c53e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Action;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
 
@@ -40,6 +41,7 @@ import com.google.protobuf.Message;
 
 /**
  * RPC Executor that uses different queues for reads and writes.
+ * With the options to use different queues/executors for gets and scans.
  * Each handler has its own queue and there is no stealing.
  */
 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
@@ -51,20 +53,34 @@ public class RWQueueRpcExecutor extends RpcExecutor {
   private final Random balancer = new Random();
   private final int writeHandlersCount;
   private final int readHandlersCount;
+  private final int scanHandlersCount;
   private final int numWriteQueues;
   private final int numReadQueues;
+  private final int numScanQueues;
 
   public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
       final float readShare, final int maxQueueLength) {
-    this(name, handlerCount, numQueues, readShare, maxQueueLength,
+    this(name, handlerCount, numQueues, readShare, maxQueueLength, 0, LinkedBlockingQueue.class);
+  }
+
+  public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
+      final float readShare, final float scanShare, final int maxQueueLength) {
+    this(name, handlerCount, numQueues, readShare, scanShare, maxQueueLength,
       LinkedBlockingQueue.class);
   }
 
   public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
       final float readShare, final int maxQueueLength,
       final Class<? extends BlockingQueue> readQueueClass, Object... readQueueInitArgs) {
+    this(name, handlerCount, numQueues, readShare, 0, maxQueueLength,
+      readQueueClass, readQueueInitArgs);
+  }
+
+  public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
+      final float readShare, final float scanShare, final int maxQueueLength,
+      final Class<? extends BlockingQueue> readQueueClass, Object... readQueueInitArgs) {
     this(name, calcNumWriters(handlerCount, readShare), calcNumReaders(handlerCount, readShare),
-      calcNumWriters(numQueues, readShare), calcNumReaders(numQueues, readShare),
+      calcNumWriters(numQueues, readShare), calcNumReaders(numQueues, readShare), scanShare,
       LinkedBlockingQueue.class, new Object[] {maxQueueLength},
       readQueueClass, ArrayUtils.addAll(new Object[] {maxQueueLength}, readQueueInitArgs));
   }
@@ -73,23 +89,45 @@ public class RWQueueRpcExecutor extends RpcExecutor {
       final int numWriteQueues, final int numReadQueues,
       final Class<? extends BlockingQueue> writeQueueClass, Object[] writeQueueInitArgs,
       final Class<? extends BlockingQueue> readQueueClass, Object[] readQueueInitArgs) {
-    super(name, Math.max(writeHandlers + readHandlers, numWriteQueues + numReadQueues));
+    this(name, writeHandlers, readHandlers, numWriteQueues, numReadQueues, 0,
+      writeQueueClass, writeQueueInitArgs, readQueueClass, readQueueInitArgs);
+  }
+
+  public RWQueueRpcExecutor(final String name, int writeHandlers, int readHandlers,
+      int numWriteQueues, int numReadQueues, float scanShare,
+      final Class<? extends BlockingQueue> writeQueueClass, Object[] writeQueueInitArgs,
+      final Class<? extends BlockingQueue> readQueueClass, Object[] readQueueInitArgs) {
+    super(name, Math.max(writeHandlers, numWriteQueues) + Math.max(readHandlers, numReadQueues));
+
+    int numScanQueues = Math.max(0, (int)Math.floor(numReadQueues * scanShare));
+    int scanHandlers = Math.max(0, (int)Math.floor(readHandlers * scanShare));
+    if ((numReadQueues - numScanQueues) > 0) {
+      numReadQueues -= numScanQueues;
+      readHandlers -= scanHandlers;
+    } else {
+      numScanQueues = 0;
+      scanHandlers = 0;
+    }
 
     this.writeHandlersCount = Math.max(writeHandlers, numWriteQueues);
     this.readHandlersCount = Math.max(readHandlers, numReadQueues);
+    this.scanHandlersCount = Math.max(scanHandlers, numScanQueues);
     this.numWriteQueues = numWriteQueues;
     this.numReadQueues = numReadQueues;
+    this.numScanQueues = numScanQueues;
 
     queues = new ArrayList<BlockingQueue<CallRunner>>(writeHandlersCount + readHandlersCount);
     LOG.debug(name + " writeQueues=" + numWriteQueues + " writeHandlers=" + writeHandlersCount +
-              " readQueues=" + numReadQueues + " readHandlers=" + readHandlersCount);
+              " readQueues=" + numReadQueues + " readHandlers=" + readHandlersCount +
+              ((numScanQueues == 0) ? "" : " scanQueues=" + numScanQueues +
+                " scanHandlers=" + scanHandlersCount));
 
     for (int i = 0; i < numWriteQueues; ++i) {
       queues.add((BlockingQueue<CallRunner>)
         ReflectionUtils.newInstance(writeQueueClass, writeQueueInitArgs));
     }
 
-    for (int i = 0; i < numReadQueues; ++i) {
+    for (int i = 0; i < (numReadQueues + numScanQueues); ++i) {
       queues.add((BlockingQueue<CallRunner>)
         ReflectionUtils.newInstance(readQueueClass, readQueueInitArgs));
     }
@@ -99,6 +137,8 @@ public class RWQueueRpcExecutor extends RpcExecutor {
   protected void startHandlers(final int port) {
     startHandlers(".write", writeHandlersCount, queues, 0, numWriteQueues, port);
     startHandlers(".read", readHandlersCount, queues, numWriteQueues, numReadQueues, port);
+    startHandlers(".scan", scanHandlersCount, queues,
+                  numWriteQueues + numReadQueues, numScanQueues, port);
   }
 
   @Override
@@ -107,6 +147,8 @@ public class RWQueueRpcExecutor extends RpcExecutor {
     int queueIndex;
     if (isWriteRequest(call.getHeader(), call.param)) {
       queueIndex = balancer.nextInt(numWriteQueues);
+    } else if (numScanQueues > 0 && isScanRequest(call.getHeader(), call.param)) {
+      queueIndex = numWriteQueues + numReadQueues + balancer.nextInt(numScanQueues);
     } else {
       queueIndex = numWriteQueues + balancer.nextInt(numReadQueues);
     }
@@ -126,6 +168,19 @@ public class RWQueueRpcExecutor extends RpcExecutor {
         }
       }
     }
+    if (methodName.equalsIgnoreCase("mutate")) {
+      return true;
+    }
+    return false;
+  }
+
+  private boolean isScanRequest(final RequestHeader header, final Message param) {
+    String methodName = header.getMethodName();
+    if (methodName.equalsIgnoreCase("scan")) {
+      // The first scan request will be executed as a "short read"
+      ScanRequest request = (ScanRequest)param;
+      return request.hasScannerId();
+    }
     return false;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/7df6f580/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
index 0458c00..2debe2e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
@@ -38,7 +38,10 @@ import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
 public class SimpleRpcScheduler extends RpcScheduler {
   public static final Log LOG = LogFactory.getLog(SimpleRpcScheduler.class);
 
-  public static final String CALL_QUEUE_READ_SHARE_CONF_KEY = "hbase.ipc.server.callqueue.read.share";
+  public static final String CALL_QUEUE_READ_SHARE_CONF_KEY =
+      "hbase.ipc.server.callqueue.read.ratio";
+  public static final String CALL_QUEUE_SCAN_SHARE_CONF_KEY =
+      "hbase.ipc.server.callqueue.scan.ratio";
   public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY =
       "hbase.ipc.server.callqueue.handler.factor";
 
@@ -112,6 +115,7 @@ public class SimpleRpcScheduler extends RpcScheduler {
 
     String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
     float callqReadShare = conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
+    float callqScanShare = conf.getFloat(CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0);
 
     float callQueuesHandlersFactor = conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0);
     int numCallQueues = Math.max(1, (int)Math.round(handlerCount * callQueuesHandlersFactor));
@@ -123,10 +127,11 @@ public class SimpleRpcScheduler extends RpcScheduler {
       if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
         CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
         callExecutor = new RWQueueRpcExecutor("default", handlerCount, numCallQueues,
-            callqReadShare, maxQueueLength, BoundedPriorityBlockingQueue.class, callPriority);
+            callqReadShare, callqScanShare, maxQueueLength,
+            BoundedPriorityBlockingQueue.class, callPriority);
       } else {
         callExecutor = new RWQueueRpcExecutor("default", handlerCount, numCallQueues,
-            callqReadShare, maxQueueLength);
+            callqReadShare, callqScanShare, maxQueueLength);
       }
     } else {
       // multiple queues

http://git-wip-us.apache.org/repos/asf/hbase/blob/7df6f580/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
index d4a79b6..9d51915 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.SmallTests;
 import org.apache.hadoop.hbase.ipc.RpcServer.Call;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
 import org.apache.hadoop.hbase.util.Threads;
 import org.junit.Before;
 import org.junit.Test;
@@ -46,6 +47,7 @@ import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.eq;
@@ -185,39 +187,9 @@ public class TestSimpleRpcScheduler {
       when(priority.getDeadline(eq(hugeHead), any(Message.class))).thenReturn(100L);
 
       final ArrayList<Integer> work = new ArrayList<Integer>();
-
-      doAnswer(new Answer<Object>() {
-        @Override
-        public Object answer(InvocationOnMock invocation) {
-          synchronized (work) {
-            work.add(10);
-          }
-          Threads.sleepWithoutInterrupt(100);
-          return null;
-        }
-      }).when(smallCallTask).run();
-
-      doAnswer(new Answer<Object>() {
-        @Override
-        public Object answer(InvocationOnMock invocation) {
-          synchronized (work) {
-            work.add(50);
-          }
-          Threads.sleepWithoutInterrupt(100);
-          return null;
-        }
-      }).when(largeCallTask).run();
-
-      doAnswer(new Answer<Object>() {
-        @Override
-        public Object answer(InvocationOnMock invocation) {
-          synchronized (work) {
-            work.add(100);
-          }
-          Threads.sleepWithoutInterrupt(100);
-          return null;
-        }
-      }).when(hugeCallTask).run();
+      doAnswerTaskExecution(smallCallTask, work, 10, 250);
+      doAnswerTaskExecution(largeCallTask, work, 50, 250);
+      doAnswerTaskExecution(hugeCallTask, work, 100, 250);
 
       scheduler.dispatch(smallCallTask);
       scheduler.dispatch(smallCallTask);
@@ -253,4 +225,84 @@ public class TestSimpleRpcScheduler {
       scheduler.stop();
     }
   }
+
+  @Test
+  public void testScanQueues() throws Exception {
+    Configuration schedConf = HBaseConfiguration.create();
+    schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
+    schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f);
+    schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f);
+
+    PriorityFunction priority = mock(PriorityFunction.class);
+    when(priority.getPriority(any(RequestHeader.class), any(Message.class)))
+      .thenReturn(HConstants.NORMAL_QOS);
+
+    RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 3, 1, 1, priority,
+                                                    HConstants.QOS_THRESHOLD);
+    try {
+      scheduler.start();
+
+      CallRunner putCallTask = mock(CallRunner.class);
+      RpcServer.Call putCall = mock(RpcServer.Call.class);
+      RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
+      when(putCallTask.getCall()).thenReturn(putCall);
+      when(putCall.getHeader()).thenReturn(putHead);
+
+      CallRunner getCallTask = mock(CallRunner.class);
+      RpcServer.Call getCall = mock(RpcServer.Call.class);
+      RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build();
+      when(getCallTask.getCall()).thenReturn(getCall);
+      when(getCall.getHeader()).thenReturn(getHead);
+
+      CallRunner scanCallTask = mock(CallRunner.class);
+      RpcServer.Call scanCall = mock(RpcServer.Call.class);
+      scanCall.param = ScanRequest.newBuilder().setScannerId(1).build();
+      RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build();
+      when(scanCallTask.getCall()).thenReturn(scanCall);
+      when(scanCall.getHeader()).thenReturn(scanHead);
+
+      ArrayList<Integer> work = new ArrayList<Integer>();
+      doAnswerTaskExecution(putCallTask, work, 1, 1000);
+      doAnswerTaskExecution(getCallTask, work, 2, 1000);
+      doAnswerTaskExecution(scanCallTask, work, 3, 1000);
+
+      // There are 3 queues: [puts], [gets], [scans]
+      // so the calls will be interleaved
+      scheduler.dispatch(putCallTask);
+      scheduler.dispatch(putCallTask);
+      scheduler.dispatch(putCallTask);
+      scheduler.dispatch(getCallTask);
+      scheduler.dispatch(getCallTask);
+      scheduler.dispatch(getCallTask);
+      scheduler.dispatch(scanCallTask);
+      scheduler.dispatch(scanCallTask);
+      scheduler.dispatch(scanCallTask);
+
+      while (work.size() < 6) {
+        Threads.sleepWithoutInterrupt(100);
+      }
+
+      for (int i = 0; i < work.size() - 2; i += 3) {
+        assertNotEquals(work.get(i + 0), work.get(i + 1));
+        assertNotEquals(work.get(i + 0), work.get(i + 2));
+        assertNotEquals(work.get(i + 1), work.get(i + 2));
+      }
+    } finally {
+      scheduler.stop();
+    }
+  }
+
+  private void doAnswerTaskExecution(final CallRunner callTask,
+      final ArrayList<Integer> results, final int value, final int sleepInterval) {
+    doAnswer(new Answer<Object>() {
+      @Override
+      public Object answer(InvocationOnMock invocation) {
+        synchronized (results) {
+          results.add(value);
+        }
+        Threads.sleepWithoutInterrupt(sleepInterval);
+        return null;
+      }
+    }).when(callTask).run();
+  }
 }