You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2014/08/14 21:54:26 UTC
[1/2] git commit: HBASE-11724 Add to RWQueueRpcExecutor the ability
to split get and scan handlers
Repository: hbase
Updated Branches:
refs/heads/branch-1 fc5f0a672 -> 7df6f5800
refs/heads/master 865fae8a8 -> e1e70a7e2
HBASE-11724 Add to RWQueueRpcExecutor the ability to split get and scan handlers
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e1e70a7e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e1e70a7e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e1e70a7e
Branch: refs/heads/master
Commit: e1e70a7e2f74c45e6b58b0ccace21969a7a29358
Parents: 865fae8
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Thu Aug 14 20:52:11 2014 +0100
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Thu Aug 14 20:52:11 2014 +0100
----------------------------------------------------------------------
.../src/main/resources/hbase-default.xml | 45 ++++++-
.../hadoop/hbase/ipc/RWQueueRpcExecutor.java | 65 +++++++++-
.../hadoop/hbase/ipc/SimpleRpcScheduler.java | 11 +-
.../hbase/ipc/TestSimpleRpcScheduler.java | 118 +++++++++++++------
4 files changed, 194 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/e1e70a7e/hbase-common/src/main/resources/hbase-default.xml
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index 7b71896..aeb78a5 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -187,13 +187,50 @@ possible configurations would overwhelm and obscure the important.
A value of 0 means a single queue shared between all the handlers.
A value of 1 means that each handler has its own queue.</description>
</property>
- <property>
- <name>hbase.ipc.server.callqueue.read.share</name>
+<property>
+ <name>hbase.ipc.server.callqueue.read.ratio</name>
<value>0</value>
<description>Split the call queues into read and write queues.
- A value of 0 indicate to not split the call queues.
- A value of 0.5 means there will be the same number of read and write queues
+ The specified interval (which should be between 0.0 and 1.0)
+ will be multiplied by the number of call queues.
+ A value of 0 indicate to not split the call queues, meaning that both read and write
+ requests will be pushed to the same set of queues.
+ A value lower than 0.5 means that there will be less read queues than write queues.
+ A value of 0.5 means there will be the same number of read and write queues.
+ A value greater than 0.5 means that there will be more read queues than write queues.
A value of 1.0 means that all the queues except one are used to dispatch read requests.
+
+ Example: Given the total number of call queues being 10
+ a read.ratio of 0 means that: the 10 queues will contain both read/write requests.
+ a read.ratio of 0.3 means that: 3 queues will contain only read requests
+ and 7 queues will contain only write requests.
+ a read.ratio of 0.5 means that: 5 queues will contain only read requests
+ and 5 queues will contain only write requests.
+ a read.ratio of 0.8 means that: 8 queues will contain only read requests
+ and 2 queues will contain only write requests.
+ a read.ratio of 1 means that: 9 queues will contain only read requests
+ and 1 queues will contain only write requests.
+ </description>
+ </property>
+ <property>
+ <name>hbase.ipc.server.callqueue.scan.ratio</name>
+ <value>0</value>
+ <description>Given the number of read call queues, calculated from the total number
+ of call queues multiplied by the callqueue.read.ratio, the scan.ratio property
+ will split the read call queues into small-read and long-read queues.
+ A value lower than 0.5 means that there will be less long-read queues than short-read queues.
+ A value of 0.5 means that there will be the same number of short-read and long-read queues.
+ A value greater than 0.5 means that there will be more long-read queues than short-read queues
+ A value of 0 or 1 indicate to use the same set of queues for gets and scans.
+
+ Example: Given the total number of read call queues being 8
+ a scan.ratio of 0 or 1 means that: 8 queues will contain both long and short read requests.
+ a scan.ratio of 0.3 means that: 2 queues will contain only long-read requests
+ and 6 queues will contain only short-read requests.
+ a scan.ratio of 0.5 means that: 4 queues will contain only long-read requests
+ and 4 queues will contain only short-read requests.
+ a scan.ratio of 0.8 means that: 6 queues will contain only long-read requests
+ and 2 queues will contain only short-read requests.
</description>
</property>
<property>
http://git-wip-us.apache.org/repos/asf/hbase/blob/e1e70a7e/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
index 247b7da..602c53e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Action;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.util.ReflectionUtils;
@@ -40,6 +41,7 @@ import com.google.protobuf.Message;
/**
* RPC Executor that uses different queues for reads and writes.
+ * With the options to use different queues/executors for gets and scans.
* Each handler has its own queue and there is no stealing.
*/
@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
@@ -51,20 +53,34 @@ public class RWQueueRpcExecutor extends RpcExecutor {
private final Random balancer = new Random();
private final int writeHandlersCount;
private final int readHandlersCount;
+ private final int scanHandlersCount;
private final int numWriteQueues;
private final int numReadQueues;
+ private final int numScanQueues;
public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
final float readShare, final int maxQueueLength) {
- this(name, handlerCount, numQueues, readShare, maxQueueLength,
+ this(name, handlerCount, numQueues, readShare, maxQueueLength, 0, LinkedBlockingQueue.class);
+ }
+
+ public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
+ final float readShare, final float scanShare, final int maxQueueLength) {
+ this(name, handlerCount, numQueues, readShare, scanShare, maxQueueLength,
LinkedBlockingQueue.class);
}
public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
final float readShare, final int maxQueueLength,
final Class<? extends BlockingQueue> readQueueClass, Object... readQueueInitArgs) {
+ this(name, handlerCount, numQueues, readShare, 0, maxQueueLength,
+ readQueueClass, readQueueInitArgs);
+ }
+
+ public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
+ final float readShare, final float scanShare, final int maxQueueLength,
+ final Class<? extends BlockingQueue> readQueueClass, Object... readQueueInitArgs) {
this(name, calcNumWriters(handlerCount, readShare), calcNumReaders(handlerCount, readShare),
- calcNumWriters(numQueues, readShare), calcNumReaders(numQueues, readShare),
+ calcNumWriters(numQueues, readShare), calcNumReaders(numQueues, readShare), scanShare,
LinkedBlockingQueue.class, new Object[] {maxQueueLength},
readQueueClass, ArrayUtils.addAll(new Object[] {maxQueueLength}, readQueueInitArgs));
}
@@ -73,23 +89,45 @@ public class RWQueueRpcExecutor extends RpcExecutor {
final int numWriteQueues, final int numReadQueues,
final Class<? extends BlockingQueue> writeQueueClass, Object[] writeQueueInitArgs,
final Class<? extends BlockingQueue> readQueueClass, Object[] readQueueInitArgs) {
- super(name, Math.max(writeHandlers + readHandlers, numWriteQueues + numReadQueues));
+ this(name, writeHandlers, readHandlers, numWriteQueues, numReadQueues, 0,
+ writeQueueClass, writeQueueInitArgs, readQueueClass, readQueueInitArgs);
+ }
+
+ public RWQueueRpcExecutor(final String name, int writeHandlers, int readHandlers,
+ int numWriteQueues, int numReadQueues, float scanShare,
+ final Class<? extends BlockingQueue> writeQueueClass, Object[] writeQueueInitArgs,
+ final Class<? extends BlockingQueue> readQueueClass, Object[] readQueueInitArgs) {
+ super(name, Math.max(writeHandlers, numWriteQueues) + Math.max(readHandlers, numReadQueues));
+
+ int numScanQueues = Math.max(0, (int)Math.floor(numReadQueues * scanShare));
+ int scanHandlers = Math.max(0, (int)Math.floor(readHandlers * scanShare));
+ if ((numReadQueues - numScanQueues) > 0) {
+ numReadQueues -= numScanQueues;
+ readHandlers -= scanHandlers;
+ } else {
+ numScanQueues = 0;
+ scanHandlers = 0;
+ }
this.writeHandlersCount = Math.max(writeHandlers, numWriteQueues);
this.readHandlersCount = Math.max(readHandlers, numReadQueues);
+ this.scanHandlersCount = Math.max(scanHandlers, numScanQueues);
this.numWriteQueues = numWriteQueues;
this.numReadQueues = numReadQueues;
+ this.numScanQueues = numScanQueues;
queues = new ArrayList<BlockingQueue<CallRunner>>(writeHandlersCount + readHandlersCount);
LOG.debug(name + " writeQueues=" + numWriteQueues + " writeHandlers=" + writeHandlersCount +
- " readQueues=" + numReadQueues + " readHandlers=" + readHandlersCount);
+ " readQueues=" + numReadQueues + " readHandlers=" + readHandlersCount +
+ ((numScanQueues == 0) ? "" : " scanQueues=" + numScanQueues +
+ " scanHandlers=" + scanHandlersCount));
for (int i = 0; i < numWriteQueues; ++i) {
queues.add((BlockingQueue<CallRunner>)
ReflectionUtils.newInstance(writeQueueClass, writeQueueInitArgs));
}
- for (int i = 0; i < numReadQueues; ++i) {
+ for (int i = 0; i < (numReadQueues + numScanQueues); ++i) {
queues.add((BlockingQueue<CallRunner>)
ReflectionUtils.newInstance(readQueueClass, readQueueInitArgs));
}
@@ -99,6 +137,8 @@ public class RWQueueRpcExecutor extends RpcExecutor {
protected void startHandlers(final int port) {
startHandlers(".write", writeHandlersCount, queues, 0, numWriteQueues, port);
startHandlers(".read", readHandlersCount, queues, numWriteQueues, numReadQueues, port);
+ startHandlers(".scan", scanHandlersCount, queues,
+ numWriteQueues + numReadQueues, numScanQueues, port);
}
@Override
@@ -107,6 +147,8 @@ public class RWQueueRpcExecutor extends RpcExecutor {
int queueIndex;
if (isWriteRequest(call.getHeader(), call.param)) {
queueIndex = balancer.nextInt(numWriteQueues);
+ } else if (numScanQueues > 0 && isScanRequest(call.getHeader(), call.param)) {
+ queueIndex = numWriteQueues + numReadQueues + balancer.nextInt(numScanQueues);
} else {
queueIndex = numWriteQueues + balancer.nextInt(numReadQueues);
}
@@ -126,6 +168,19 @@ public class RWQueueRpcExecutor extends RpcExecutor {
}
}
}
+ if (methodName.equalsIgnoreCase("mutate")) {
+ return true;
+ }
+ return false;
+ }
+
+ private boolean isScanRequest(final RequestHeader header, final Message param) {
+ String methodName = header.getMethodName();
+ if (methodName.equalsIgnoreCase("scan")) {
+ // The first scan request will be executed as a "short read"
+ ScanRequest request = (ScanRequest)param;
+ return request.hasScannerId();
+ }
return false;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/e1e70a7e/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
index 0458c00..2debe2e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
@@ -38,7 +38,10 @@ import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
public class SimpleRpcScheduler extends RpcScheduler {
public static final Log LOG = LogFactory.getLog(SimpleRpcScheduler.class);
- public static final String CALL_QUEUE_READ_SHARE_CONF_KEY = "hbase.ipc.server.callqueue.read.share";
+ public static final String CALL_QUEUE_READ_SHARE_CONF_KEY =
+ "hbase.ipc.server.callqueue.read.ratio";
+ public static final String CALL_QUEUE_SCAN_SHARE_CONF_KEY =
+ "hbase.ipc.server.callqueue.scan.ratio";
public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY =
"hbase.ipc.server.callqueue.handler.factor";
@@ -112,6 +115,7 @@ public class SimpleRpcScheduler extends RpcScheduler {
String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
float callqReadShare = conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
+ float callqScanShare = conf.getFloat(CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0);
float callQueuesHandlersFactor = conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0);
int numCallQueues = Math.max(1, (int)Math.round(handlerCount * callQueuesHandlersFactor));
@@ -123,10 +127,11 @@ public class SimpleRpcScheduler extends RpcScheduler {
if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
callExecutor = new RWQueueRpcExecutor("default", handlerCount, numCallQueues,
- callqReadShare, maxQueueLength, BoundedPriorityBlockingQueue.class, callPriority);
+ callqReadShare, callqScanShare, maxQueueLength,
+ BoundedPriorityBlockingQueue.class, callPriority);
} else {
callExecutor = new RWQueueRpcExecutor("default", handlerCount, numCallQueues,
- callqReadShare, maxQueueLength);
+ callqReadShare, callqScanShare, maxQueueLength);
}
} else {
// multiple queues
http://git-wip-us.apache.org/repos/asf/hbase/blob/e1e70a7e/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
index d4a79b6..9d51915 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.ipc.RpcServer.Call;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.Before;
import org.junit.Test;
@@ -46,6 +47,7 @@ import java.util.Map;
import java.util.concurrent.CountDownLatch;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.eq;
@@ -185,39 +187,9 @@ public class TestSimpleRpcScheduler {
when(priority.getDeadline(eq(hugeHead), any(Message.class))).thenReturn(100L);
final ArrayList<Integer> work = new ArrayList<Integer>();
-
- doAnswer(new Answer<Object>() {
- @Override
- public Object answer(InvocationOnMock invocation) {
- synchronized (work) {
- work.add(10);
- }
- Threads.sleepWithoutInterrupt(100);
- return null;
- }
- }).when(smallCallTask).run();
-
- doAnswer(new Answer<Object>() {
- @Override
- public Object answer(InvocationOnMock invocation) {
- synchronized (work) {
- work.add(50);
- }
- Threads.sleepWithoutInterrupt(100);
- return null;
- }
- }).when(largeCallTask).run();
-
- doAnswer(new Answer<Object>() {
- @Override
- public Object answer(InvocationOnMock invocation) {
- synchronized (work) {
- work.add(100);
- }
- Threads.sleepWithoutInterrupt(100);
- return null;
- }
- }).when(hugeCallTask).run();
+ doAnswerTaskExecution(smallCallTask, work, 10, 250);
+ doAnswerTaskExecution(largeCallTask, work, 50, 250);
+ doAnswerTaskExecution(hugeCallTask, work, 100, 250);
scheduler.dispatch(smallCallTask);
scheduler.dispatch(smallCallTask);
@@ -253,4 +225,84 @@ public class TestSimpleRpcScheduler {
scheduler.stop();
}
}
+
+ @Test
+ public void testScanQueues() throws Exception {
+ Configuration schedConf = HBaseConfiguration.create();
+ schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
+ schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f);
+ schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f);
+
+ PriorityFunction priority = mock(PriorityFunction.class);
+ when(priority.getPriority(any(RequestHeader.class), any(Message.class)))
+ .thenReturn(HConstants.NORMAL_QOS);
+
+ RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 3, 1, 1, priority,
+ HConstants.QOS_THRESHOLD);
+ try {
+ scheduler.start();
+
+ CallRunner putCallTask = mock(CallRunner.class);
+ RpcServer.Call putCall = mock(RpcServer.Call.class);
+ RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
+ when(putCallTask.getCall()).thenReturn(putCall);
+ when(putCall.getHeader()).thenReturn(putHead);
+
+ CallRunner getCallTask = mock(CallRunner.class);
+ RpcServer.Call getCall = mock(RpcServer.Call.class);
+ RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build();
+ when(getCallTask.getCall()).thenReturn(getCall);
+ when(getCall.getHeader()).thenReturn(getHead);
+
+ CallRunner scanCallTask = mock(CallRunner.class);
+ RpcServer.Call scanCall = mock(RpcServer.Call.class);
+ scanCall.param = ScanRequest.newBuilder().setScannerId(1).build();
+ RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build();
+ when(scanCallTask.getCall()).thenReturn(scanCall);
+ when(scanCall.getHeader()).thenReturn(scanHead);
+
+ ArrayList<Integer> work = new ArrayList<Integer>();
+ doAnswerTaskExecution(putCallTask, work, 1, 1000);
+ doAnswerTaskExecution(getCallTask, work, 2, 1000);
+ doAnswerTaskExecution(scanCallTask, work, 3, 1000);
+
+ // There are 3 queues: [puts], [gets], [scans]
+ // so the calls will be interleaved
+ scheduler.dispatch(putCallTask);
+ scheduler.dispatch(putCallTask);
+ scheduler.dispatch(putCallTask);
+ scheduler.dispatch(getCallTask);
+ scheduler.dispatch(getCallTask);
+ scheduler.dispatch(getCallTask);
+ scheduler.dispatch(scanCallTask);
+ scheduler.dispatch(scanCallTask);
+ scheduler.dispatch(scanCallTask);
+
+ while (work.size() < 6) {
+ Threads.sleepWithoutInterrupt(100);
+ }
+
+ for (int i = 0; i < work.size() - 2; i += 3) {
+ assertNotEquals(work.get(i + 0), work.get(i + 1));
+ assertNotEquals(work.get(i + 0), work.get(i + 2));
+ assertNotEquals(work.get(i + 1), work.get(i + 2));
+ }
+ } finally {
+ scheduler.stop();
+ }
+ }
+
+ private void doAnswerTaskExecution(final CallRunner callTask,
+ final ArrayList<Integer> results, final int value, final int sleepInterval) {
+ doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocation) {
+ synchronized (results) {
+ results.add(value);
+ }
+ Threads.sleepWithoutInterrupt(sleepInterval);
+ return null;
+ }
+ }).when(callTask).run();
+ }
}
[2/2] git commit: HBASE-11724 Add to RWQueueRpcExecutor the ability
to split get and scan handlers
Posted by mb...@apache.org.
HBASE-11724 Add to RWQueueRpcExecutor the ability to split get and scan handlers
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7df6f580
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7df6f580
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7df6f580
Branch: refs/heads/branch-1
Commit: 7df6f580010e1b934ba0cc2b14395185eff8948e
Parents: fc5f0a6
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Thu Aug 14 20:52:11 2014 +0100
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Thu Aug 14 20:52:40 2014 +0100
----------------------------------------------------------------------
.../src/main/resources/hbase-default.xml | 45 ++++++-
.../hadoop/hbase/ipc/RWQueueRpcExecutor.java | 65 +++++++++-
.../hadoop/hbase/ipc/SimpleRpcScheduler.java | 11 +-
.../hbase/ipc/TestSimpleRpcScheduler.java | 118 +++++++++++++------
4 files changed, 194 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/7df6f580/hbase-common/src/main/resources/hbase-default.xml
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index b58b6ff..d52233a 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -187,13 +187,50 @@ possible configurations would overwhelm and obscure the important.
A value of 0 means a single queue shared between all the handlers.
A value of 1 means that each handler has its own queue.</description>
</property>
- <property>
- <name>hbase.ipc.server.callqueue.read.share</name>
+<property>
+ <name>hbase.ipc.server.callqueue.read.ratio</name>
<value>0</value>
<description>Split the call queues into read and write queues.
- A value of 0 indicate to not split the call queues.
- A value of 0.5 means there will be the same number of read and write queues
+ The specified interval (which should be between 0.0 and 1.0)
+ will be multiplied by the number of call queues.
+ A value of 0 indicate to not split the call queues, meaning that both read and write
+ requests will be pushed to the same set of queues.
+ A value lower than 0.5 means that there will be less read queues than write queues.
+ A value of 0.5 means there will be the same number of read and write queues.
+ A value greater than 0.5 means that there will be more read queues than write queues.
A value of 1.0 means that all the queues except one are used to dispatch read requests.
+
+ Example: Given the total number of call queues being 10
+ a read.ratio of 0 means that: the 10 queues will contain both read/write requests.
+ a read.ratio of 0.3 means that: 3 queues will contain only read requests
+ and 7 queues will contain only write requests.
+ a read.ratio of 0.5 means that: 5 queues will contain only read requests
+ and 5 queues will contain only write requests.
+ a read.ratio of 0.8 means that: 8 queues will contain only read requests
+ and 2 queues will contain only write requests.
+ a read.ratio of 1 means that: 9 queues will contain only read requests
+ and 1 queues will contain only write requests.
+ </description>
+ </property>
+ <property>
+ <name>hbase.ipc.server.callqueue.scan.ratio</name>
+ <value>0</value>
+ <description>Given the number of read call queues, calculated from the total number
+ of call queues multiplied by the callqueue.read.ratio, the scan.ratio property
+ will split the read call queues into small-read and long-read queues.
+ A value lower than 0.5 means that there will be less long-read queues than short-read queues.
+ A value of 0.5 means that there will be the same number of short-read and long-read queues.
+ A value greater than 0.5 means that there will be more long-read queues than short-read queues
+ A value of 0 or 1 indicate to use the same set of queues for gets and scans.
+
+ Example: Given the total number of read call queues being 8
+ a scan.ratio of 0 or 1 means that: 8 queues will contain both long and short read requests.
+ a scan.ratio of 0.3 means that: 2 queues will contain only long-read requests
+ and 6 queues will contain only short-read requests.
+ a scan.ratio of 0.5 means that: 4 queues will contain only long-read requests
+ and 4 queues will contain only short-read requests.
+ a scan.ratio of 0.8 means that: 6 queues will contain only long-read requests
+ and 2 queues will contain only short-read requests.
</description>
</property>
<property>
http://git-wip-us.apache.org/repos/asf/hbase/blob/7df6f580/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
index 247b7da..602c53e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Action;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.util.ReflectionUtils;
@@ -40,6 +41,7 @@ import com.google.protobuf.Message;
/**
* RPC Executor that uses different queues for reads and writes.
+ * With the options to use different queues/executors for gets and scans.
* Each handler has its own queue and there is no stealing.
*/
@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
@@ -51,20 +53,34 @@ public class RWQueueRpcExecutor extends RpcExecutor {
private final Random balancer = new Random();
private final int writeHandlersCount;
private final int readHandlersCount;
+ private final int scanHandlersCount;
private final int numWriteQueues;
private final int numReadQueues;
+ private final int numScanQueues;
public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
final float readShare, final int maxQueueLength) {
- this(name, handlerCount, numQueues, readShare, maxQueueLength,
+ this(name, handlerCount, numQueues, readShare, maxQueueLength, 0, LinkedBlockingQueue.class);
+ }
+
+ public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
+ final float readShare, final float scanShare, final int maxQueueLength) {
+ this(name, handlerCount, numQueues, readShare, scanShare, maxQueueLength,
LinkedBlockingQueue.class);
}
public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
final float readShare, final int maxQueueLength,
final Class<? extends BlockingQueue> readQueueClass, Object... readQueueInitArgs) {
+ this(name, handlerCount, numQueues, readShare, 0, maxQueueLength,
+ readQueueClass, readQueueInitArgs);
+ }
+
+ public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
+ final float readShare, final float scanShare, final int maxQueueLength,
+ final Class<? extends BlockingQueue> readQueueClass, Object... readQueueInitArgs) {
this(name, calcNumWriters(handlerCount, readShare), calcNumReaders(handlerCount, readShare),
- calcNumWriters(numQueues, readShare), calcNumReaders(numQueues, readShare),
+ calcNumWriters(numQueues, readShare), calcNumReaders(numQueues, readShare), scanShare,
LinkedBlockingQueue.class, new Object[] {maxQueueLength},
readQueueClass, ArrayUtils.addAll(new Object[] {maxQueueLength}, readQueueInitArgs));
}
@@ -73,23 +89,45 @@ public class RWQueueRpcExecutor extends RpcExecutor {
final int numWriteQueues, final int numReadQueues,
final Class<? extends BlockingQueue> writeQueueClass, Object[] writeQueueInitArgs,
final Class<? extends BlockingQueue> readQueueClass, Object[] readQueueInitArgs) {
- super(name, Math.max(writeHandlers + readHandlers, numWriteQueues + numReadQueues));
+ this(name, writeHandlers, readHandlers, numWriteQueues, numReadQueues, 0,
+ writeQueueClass, writeQueueInitArgs, readQueueClass, readQueueInitArgs);
+ }
+
+ public RWQueueRpcExecutor(final String name, int writeHandlers, int readHandlers,
+ int numWriteQueues, int numReadQueues, float scanShare,
+ final Class<? extends BlockingQueue> writeQueueClass, Object[] writeQueueInitArgs,
+ final Class<? extends BlockingQueue> readQueueClass, Object[] readQueueInitArgs) {
+ super(name, Math.max(writeHandlers, numWriteQueues) + Math.max(readHandlers, numReadQueues));
+
+ int numScanQueues = Math.max(0, (int)Math.floor(numReadQueues * scanShare));
+ int scanHandlers = Math.max(0, (int)Math.floor(readHandlers * scanShare));
+ if ((numReadQueues - numScanQueues) > 0) {
+ numReadQueues -= numScanQueues;
+ readHandlers -= scanHandlers;
+ } else {
+ numScanQueues = 0;
+ scanHandlers = 0;
+ }
this.writeHandlersCount = Math.max(writeHandlers, numWriteQueues);
this.readHandlersCount = Math.max(readHandlers, numReadQueues);
+ this.scanHandlersCount = Math.max(scanHandlers, numScanQueues);
this.numWriteQueues = numWriteQueues;
this.numReadQueues = numReadQueues;
+ this.numScanQueues = numScanQueues;
queues = new ArrayList<BlockingQueue<CallRunner>>(writeHandlersCount + readHandlersCount);
LOG.debug(name + " writeQueues=" + numWriteQueues + " writeHandlers=" + writeHandlersCount +
- " readQueues=" + numReadQueues + " readHandlers=" + readHandlersCount);
+ " readQueues=" + numReadQueues + " readHandlers=" + readHandlersCount +
+ ((numScanQueues == 0) ? "" : " scanQueues=" + numScanQueues +
+ " scanHandlers=" + scanHandlersCount));
for (int i = 0; i < numWriteQueues; ++i) {
queues.add((BlockingQueue<CallRunner>)
ReflectionUtils.newInstance(writeQueueClass, writeQueueInitArgs));
}
- for (int i = 0; i < numReadQueues; ++i) {
+ for (int i = 0; i < (numReadQueues + numScanQueues); ++i) {
queues.add((BlockingQueue<CallRunner>)
ReflectionUtils.newInstance(readQueueClass, readQueueInitArgs));
}
@@ -99,6 +137,8 @@ public class RWQueueRpcExecutor extends RpcExecutor {
protected void startHandlers(final int port) {
startHandlers(".write", writeHandlersCount, queues, 0, numWriteQueues, port);
startHandlers(".read", readHandlersCount, queues, numWriteQueues, numReadQueues, port);
+ startHandlers(".scan", scanHandlersCount, queues,
+ numWriteQueues + numReadQueues, numScanQueues, port);
}
@Override
@@ -107,6 +147,8 @@ public class RWQueueRpcExecutor extends RpcExecutor {
int queueIndex;
if (isWriteRequest(call.getHeader(), call.param)) {
queueIndex = balancer.nextInt(numWriteQueues);
+ } else if (numScanQueues > 0 && isScanRequest(call.getHeader(), call.param)) {
+ queueIndex = numWriteQueues + numReadQueues + balancer.nextInt(numScanQueues);
} else {
queueIndex = numWriteQueues + balancer.nextInt(numReadQueues);
}
@@ -126,6 +168,19 @@ public class RWQueueRpcExecutor extends RpcExecutor {
}
}
}
+ if (methodName.equalsIgnoreCase("mutate")) {
+ return true;
+ }
+ return false;
+ }
+
+ private boolean isScanRequest(final RequestHeader header, final Message param) {
+ String methodName = header.getMethodName();
+ if (methodName.equalsIgnoreCase("scan")) {
+ // The first scan request will be executed as a "short read"
+ ScanRequest request = (ScanRequest)param;
+ return request.hasScannerId();
+ }
return false;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/7df6f580/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
index 0458c00..2debe2e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
@@ -38,7 +38,10 @@ import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
public class SimpleRpcScheduler extends RpcScheduler {
public static final Log LOG = LogFactory.getLog(SimpleRpcScheduler.class);
- public static final String CALL_QUEUE_READ_SHARE_CONF_KEY = "hbase.ipc.server.callqueue.read.share";
+ public static final String CALL_QUEUE_READ_SHARE_CONF_KEY =
+ "hbase.ipc.server.callqueue.read.ratio";
+ public static final String CALL_QUEUE_SCAN_SHARE_CONF_KEY =
+ "hbase.ipc.server.callqueue.scan.ratio";
public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY =
"hbase.ipc.server.callqueue.handler.factor";
@@ -112,6 +115,7 @@ public class SimpleRpcScheduler extends RpcScheduler {
String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
float callqReadShare = conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
+ float callqScanShare = conf.getFloat(CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0);
float callQueuesHandlersFactor = conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0);
int numCallQueues = Math.max(1, (int)Math.round(handlerCount * callQueuesHandlersFactor));
@@ -123,10 +127,11 @@ public class SimpleRpcScheduler extends RpcScheduler {
if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
callExecutor = new RWQueueRpcExecutor("default", handlerCount, numCallQueues,
- callqReadShare, maxQueueLength, BoundedPriorityBlockingQueue.class, callPriority);
+ callqReadShare, callqScanShare, maxQueueLength,
+ BoundedPriorityBlockingQueue.class, callPriority);
} else {
callExecutor = new RWQueueRpcExecutor("default", handlerCount, numCallQueues,
- callqReadShare, maxQueueLength);
+ callqReadShare, callqScanShare, maxQueueLength);
}
} else {
// multiple queues
http://git-wip-us.apache.org/repos/asf/hbase/blob/7df6f580/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
index d4a79b6..9d51915 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.ipc.RpcServer.Call;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.Before;
import org.junit.Test;
@@ -46,6 +47,7 @@ import java.util.Map;
import java.util.concurrent.CountDownLatch;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.eq;
@@ -185,39 +187,9 @@ public class TestSimpleRpcScheduler {
when(priority.getDeadline(eq(hugeHead), any(Message.class))).thenReturn(100L);
final ArrayList<Integer> work = new ArrayList<Integer>();
-
- doAnswer(new Answer<Object>() {
- @Override
- public Object answer(InvocationOnMock invocation) {
- synchronized (work) {
- work.add(10);
- }
- Threads.sleepWithoutInterrupt(100);
- return null;
- }
- }).when(smallCallTask).run();
-
- doAnswer(new Answer<Object>() {
- @Override
- public Object answer(InvocationOnMock invocation) {
- synchronized (work) {
- work.add(50);
- }
- Threads.sleepWithoutInterrupt(100);
- return null;
- }
- }).when(largeCallTask).run();
-
- doAnswer(new Answer<Object>() {
- @Override
- public Object answer(InvocationOnMock invocation) {
- synchronized (work) {
- work.add(100);
- }
- Threads.sleepWithoutInterrupt(100);
- return null;
- }
- }).when(hugeCallTask).run();
+ doAnswerTaskExecution(smallCallTask, work, 10, 250);
+ doAnswerTaskExecution(largeCallTask, work, 50, 250);
+ doAnswerTaskExecution(hugeCallTask, work, 100, 250);
scheduler.dispatch(smallCallTask);
scheduler.dispatch(smallCallTask);
@@ -253,4 +225,84 @@ public class TestSimpleRpcScheduler {
scheduler.stop();
}
}
+
+ @Test
+ public void testScanQueues() throws Exception {
+ Configuration schedConf = HBaseConfiguration.create();
+ schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
+ schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f);
+ schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f);
+
+ PriorityFunction priority = mock(PriorityFunction.class);
+ when(priority.getPriority(any(RequestHeader.class), any(Message.class)))
+ .thenReturn(HConstants.NORMAL_QOS);
+
+ RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 3, 1, 1, priority,
+ HConstants.QOS_THRESHOLD);
+ try {
+ scheduler.start();
+
+ CallRunner putCallTask = mock(CallRunner.class);
+ RpcServer.Call putCall = mock(RpcServer.Call.class);
+ RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
+ when(putCallTask.getCall()).thenReturn(putCall);
+ when(putCall.getHeader()).thenReturn(putHead);
+
+ CallRunner getCallTask = mock(CallRunner.class);
+ RpcServer.Call getCall = mock(RpcServer.Call.class);
+ RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build();
+ when(getCallTask.getCall()).thenReturn(getCall);
+ when(getCall.getHeader()).thenReturn(getHead);
+
+ CallRunner scanCallTask = mock(CallRunner.class);
+ RpcServer.Call scanCall = mock(RpcServer.Call.class);
+ scanCall.param = ScanRequest.newBuilder().setScannerId(1).build();
+ RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build();
+ when(scanCallTask.getCall()).thenReturn(scanCall);
+ when(scanCall.getHeader()).thenReturn(scanHead);
+
+ ArrayList<Integer> work = new ArrayList<Integer>();
+ doAnswerTaskExecution(putCallTask, work, 1, 1000);
+ doAnswerTaskExecution(getCallTask, work, 2, 1000);
+ doAnswerTaskExecution(scanCallTask, work, 3, 1000);
+
+ // There are 3 queues: [puts], [gets], [scans]
+ // so the calls will be interleaved
+ scheduler.dispatch(putCallTask);
+ scheduler.dispatch(putCallTask);
+ scheduler.dispatch(putCallTask);
+ scheduler.dispatch(getCallTask);
+ scheduler.dispatch(getCallTask);
+ scheduler.dispatch(getCallTask);
+ scheduler.dispatch(scanCallTask);
+ scheduler.dispatch(scanCallTask);
+ scheduler.dispatch(scanCallTask);
+
+ while (work.size() < 6) {
+ Threads.sleepWithoutInterrupt(100);
+ }
+
+ for (int i = 0; i < work.size() - 2; i += 3) {
+ assertNotEquals(work.get(i + 0), work.get(i + 1));
+ assertNotEquals(work.get(i + 0), work.get(i + 2));
+ assertNotEquals(work.get(i + 1), work.get(i + 2));
+ }
+ } finally {
+ scheduler.stop();
+ }
+ }
+
+ private void doAnswerTaskExecution(final CallRunner callTask,
+ final ArrayList<Integer> results, final int value, final int sleepInterval) {
+ doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocation) {
+ synchronized (results) {
+ results.add(value);
+ }
+ Threads.sleepWithoutInterrupt(sleepInterval);
+ return null;
+ }
+ }).when(callTask).run();
+ }
}