You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ec...@apache.org on 2016/06/25 20:24:32 UTC
hbase git commit: HBASE-16089 Add on FastPath for CoDel
Repository: hbase
Updated Branches:
refs/heads/branch-1.3 e7bfd9bfe -> 65cbb6c2e
HBASE-16089 Add on FastPath for CoDel
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/65cbb6c2
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/65cbb6c2
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/65cbb6c2
Branch: refs/heads/branch-1.3
Commit: 65cbb6c2e7335596c6edf8c0243bcd66722641bb
Parents: e7bfd9b
Author: Elliott Clark <ec...@apache.org>
Authored: Wed Jun 22 16:34:40 2016 -0700
Committer: Elliott Clark <ec...@apache.org>
Committed: Sat Jun 25 12:59:37 2016 -0700
----------------------------------------------------------------------
.../hbase/ipc/AdaptiveLifoCoDelCallQueue.java | 59 ++++++---
.../org/apache/hadoop/hbase/ipc/CallRunner.java | 4 +-
.../ipc/FastPathBalancedQueueRpcExecutor.java | 126 +++++++++++++++++++
...ifoWithFastPathBalancedQueueRpcExecutor.java | 116 -----------------
.../hadoop/hbase/ipc/SimpleRpcScheduler.java | 12 +-
.../hbase/ipc/TestSimpleRpcScheduler.java | 67 ++++++----
6 files changed, 221 insertions(+), 163 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/65cbb6c2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java
index 08c488b..42b500f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java
@@ -73,7 +73,7 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> {
private AtomicBoolean resetDelay = new AtomicBoolean(true);
// if we're in this mode, "long" calls are getting dropped
- private volatile boolean isOverloaded;
+ private AtomicBoolean isOverloaded = new AtomicBoolean(false);
public AdaptiveLifoCoDelCallQueue(int capacity, int targetDelay, int interval,
double lifoThreshold, AtomicLong numGeneralCallsDropped, AtomicLong numLifoModeSwitches) {
@@ -126,6 +126,34 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> {
}
}
+ @Override
+ public CallRunner poll() {
+ CallRunner cr;
+ boolean switched = false;
+ while(true) {
+ if (((double) queue.size() / this.maxCapacity) > lifoThreshold) {
+ // Only count once per switch.
+ if (!switched) {
+ switched = true;
+ numLifoModeSwitches.incrementAndGet();
+ }
+ cr = queue.pollLast();
+ } else {
+ switched = false;
+ cr = queue.pollFirst();
+ }
+ if (cr == null) {
+ return cr;
+ }
+ if (needToDrop(cr)) {
+ numGeneralCallsDropped.incrementAndGet();
+ cr.drop();
+ } else {
+ return cr;
+ }
+ }
+ }
+
/**
* @param callRunner to validate
* @return true if this call needs to be skipped based on call timestamp
@@ -136,28 +164,28 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> {
long callDelay = now - callRunner.getCall().timestamp;
long localMinDelay = this.minDelay;
- if (now > intervalTime && !resetDelay.getAndSet(true)) {
+
+ // Try and determine if we should reset
+ // the delay time and determine overload
+ if (now > intervalTime &&
+ !resetDelay.get() &&
+ !resetDelay.getAndSet(true)) {
intervalTime = now + codelInterval;
- if (localMinDelay > codelTargetDelay) {
- isOverloaded = true;
- } else {
- isOverloaded = false;
- }
+ isOverloaded.set(localMinDelay > codelTargetDelay);
}
- if (resetDelay.getAndSet(false)) {
+ // If it looks like we should reset the delay
+ // time do it only once on one thread
+ if (resetDelay.get() && resetDelay.getAndSet(false)) {
minDelay = callDelay;
+ // we just reset the delay dunno about how this will work
return false;
} else if (callDelay < localMinDelay) {
minDelay = callDelay;
}
- if (isOverloaded && callDelay > 2 * codelTargetDelay) {
- return true;
- } else {
- return false;
- }
+ return isOverloaded.get() && callDelay > 2 * codelTargetDelay;
}
// Generic BlockingQueue methods we support
@@ -185,11 +213,6 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> {
+ " but take() and offer() methods");
}
- @Override
- public CallRunner poll() {
- throw new UnsupportedOperationException("This class doesn't support anything,"
- + " but take() and offer() methods");
- }
@Override
public CallRunner peek() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/65cbb6c2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
index 00e08c9..e91699a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
@@ -64,7 +64,9 @@ public class CallRunner {
this.call = call;
this.rpcServer = rpcServer;
// Add size of the call to queue size.
- this.rpcServer.addCallSize(call.getSize());
+ if (call != null && rpcServer != null) {
+ this.rpcServer.addCallSize(call.getSize());
+ }
}
public Call getCall() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/65cbb6c2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java
new file mode 100644
index 0000000..4e06f4f
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.util.Deque;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Balanced queue executor with a fastpath. Because this is FIFO, it has no respect for
+ * ordering so a fast path skipping the queuing of Calls if an Handler is available, is possible.
+ * Just pass the Call direct to waiting Handler thread. Try to keep the hot Handlers bubbling
+ * rather than let them go cold and lose context. Idea taken from Apace Kudu (incubating). See
+ * https://gerrit.cloudera.org/#/c/2938/7/src/kudu/rpc/service_queue.h
+ */
+@InterfaceAudience.Private
+public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor {
+ // Depends on default behavior of BalancedQueueRpcExecutor being FIFO!
+
+ /*
+ * Stack of Handlers waiting for work.
+ */
+ private final Deque<FastPathHandler> fastPathHandlerStack = new ConcurrentLinkedDeque<>();
+
+ public FastPathBalancedQueueRpcExecutor(final String name, final int handlerCount,
+ final int numQueues, final int maxQueueLength, final Configuration conf,
+ final Abortable abortable) {
+ super(name, handlerCount, numQueues, conf, abortable, LinkedBlockingQueue.class,
+ maxQueueLength);
+ }
+
+ public FastPathBalancedQueueRpcExecutor(String name, int handlerCount,
+ int numCallQueues,
+ Configuration conf,
+ Abortable abortable,
+ Class<? extends BlockingQueue> queueClass,
+ Object... args) {
+ super(name, handlerCount, numCallQueues, conf, abortable, queueClass, args);
+ }
+
+ @Override
+ protected Handler getHandler(String name, double handlerFailureThreshhold,
+ BlockingQueue<CallRunner> q) {
+ return new FastPathHandler(name, handlerFailureThreshhold, q, fastPathHandlerStack);
+ }
+
+ @Override
+ public boolean dispatch(CallRunner callTask) throws InterruptedException {
+ FastPathHandler handler = popReadyHandler();
+ return handler != null? handler.loadCallRunner(callTask): super.dispatch(callTask);
+ }
+
+ /**
+ * @return Pop a Handler instance if one available ready-to-go or else return null.
+ */
+ private FastPathHandler popReadyHandler() {
+ return this.fastPathHandlerStack.poll();
+ }
+
+ class FastPathHandler extends Handler {
+ // Below are for fast-path support. Push this Handler on to the fastPathHandlerStack Deque
+ // if an empty queue of CallRunners so we are available for direct handoff when one comes in.
+ final Deque<FastPathHandler> fastPathHandlerStack;
+ // Semaphore to coordinate loading of fastpathed loadedTask and our running it.
+ private Semaphore semaphore = new Semaphore(0);
+ // The task we get when fast-pathing.
+ private CallRunner loadedCallRunner;
+
+ FastPathHandler(String name, double handlerFailureThreshhold, BlockingQueue<CallRunner> q,
+ final Deque<FastPathHandler> fastPathHandlerStack) {
+ super(name, handlerFailureThreshhold, q);
+ this.fastPathHandlerStack = fastPathHandlerStack;
+ }
+
+ protected CallRunner getCallRunner() throws InterruptedException {
+ // Get a callrunner if one in the Q.
+ CallRunner cr = this.q.poll();
+ if (cr == null) {
+ // Else, if a fastPathHandlerStack present and no callrunner in Q, register ourselves for
+ // the fastpath handoff done via fastPathHandlerStack.
+ if (this.fastPathHandlerStack != null) {
+ this.fastPathHandlerStack.push(this);
+ this.semaphore.acquire();
+ cr = this.loadedCallRunner;
+ this.loadedCallRunner = null;
+ } else {
+ // No fastpath available. Block until a task comes available.
+ cr = super.getCallRunner();
+ }
+ }
+ return cr;
+ }
+
+ /**
+ * @param task Task gotten via fastpath.
+ * @return True if we successfully loaded our task
+ */
+ boolean loadCallRunner(final CallRunner cr) {
+ this.loadedCallRunner = cr;
+ this.semaphore.release();
+ return true;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/65cbb6c2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoWithFastPathBalancedQueueRpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoWithFastPathBalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoWithFastPathBalancedQueueRpcExecutor.java
deleted file mode 100644
index 1a362bc..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoWithFastPathBalancedQueueRpcExecutor.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.ipc;
-
-import java.util.Deque;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Semaphore;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
-/**
- * FIFO balanced queue executor with a fastpath. Because this is FIFO, it has no respect for
- * ordering so a fast path skipping the queuing of Calls if an Handler is available, is possible.
- * Just pass the Call direct to waiting Handler thread. Try to keep the hot Handlers bubbling
- * rather than let them go cold and lose context. Idea taken from Apace Kudu (incubating). See
- * https://gerrit.cloudera.org/#/c/2938/7/src/kudu/rpc/service_queue.h
- */
-@InterfaceAudience.Private
-public class FifoWithFastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor {
- // Depends on default behavior of BalancedQueueRpcExecutor being FIFO!
-
- /*
- * Stack of Handlers waiting for work.
- */
- private final Deque<FastPathHandler> fastPathHandlerStack = new ConcurrentLinkedDeque<>();
-
- public FifoWithFastPathBalancedQueueRpcExecutor(final String name, final int handlerCount,
- final int numQueues, final int maxQueueLength, final Configuration conf,
- final Abortable abortable) {
- super(name, handlerCount, numQueues, conf, abortable, LinkedBlockingQueue.class,
- maxQueueLength);
- }
-
- @Override
- protected Handler getHandler(String name, double handlerFailureThreshhold,
- BlockingQueue<CallRunner> q) {
- return new FastPathHandler(name, handlerFailureThreshhold, q, fastPathHandlerStack);
- }
-
- @Override
- public boolean dispatch(CallRunner callTask) throws InterruptedException {
- FastPathHandler handler = popReadyHandler();
- return handler != null? handler.loadCallRunner(callTask): super.dispatch(callTask);
- }
-
- /**
- * @return Pop a Handler instance if one available ready-to-go or else return null.
- */
- private FastPathHandler popReadyHandler() {
- return this.fastPathHandlerStack.poll();
- }
-
- class FastPathHandler extends Handler {
- // Below are for fast-path support. Push this Handler on to the fastPathHandlerStack Deque
- // if an empty queue of CallRunners so we are available for direct handoff when one comes in.
- final Deque<FastPathHandler> fastPathHandlerStack;
- // Semaphore to coordinate loading of fastpathed loadedTask and our running it.
- private Semaphore semaphore = new Semaphore(0);
- // The task we get when fast-pathing.
- private CallRunner loadedCallRunner;
-
- FastPathHandler(String name, double handlerFailureThreshhold, BlockingQueue<CallRunner> q,
- final Deque<FastPathHandler> fastPathHandlerStack) {
- super(name, handlerFailureThreshhold, q);
- this.fastPathHandlerStack = fastPathHandlerStack;
- }
-
- protected CallRunner getCallRunner() throws InterruptedException {
- // Get a callrunner if one in the Q.
- CallRunner cr = this.q.poll();
- if (cr == null) {
- // Else, if a fastPathHandlerStack present and no callrunner in Q, register ourselves for
- // the fastpath handoff done via fastPathHandlerStack.
- if (this.fastPathHandlerStack != null) {
- this.fastPathHandlerStack.push(this);
- this.semaphore.acquire();
- cr = this.loadedCallRunner;
- this.loadedCallRunner = null;
- } else {
- // No fastpath available. Block until a task comes available.
- cr = super.getCallRunner();
- }
- }
- return cr;
- }
-
- /**
- * @param task Task gotten via fastpath.
- * @return True if we successfully loaded our task
- */
- boolean loadCallRunner(final CallRunner cr) {
- this.loadedCallRunner = cr;
- this.semaphore.release();
- return true;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/65cbb6c2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
index 5e68dc8..7574965 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
@@ -72,7 +72,7 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
public static final String CALL_QUEUE_CODEL_LIFO_THRESHOLD =
"hbase.ipc.server.callqueue.codel.lifo.threshold";
- public static final int CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY = 5;
+ public static final int CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY = 100;
public static final int CALL_QUEUE_CODEL_DEFAULT_INTERVAL = 100;
public static final double CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD = 0.8;
@@ -226,23 +226,23 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
conf, abortable, BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority);
} else if (isCodelQueueType(callQueueType)) {
callExecutor =
- new BalancedQueueRpcExecutor("BQCodel.default", handlerCount, numCallQueues,
+ new FastPathBalancedQueueRpcExecutor("CodelFPBQ.default", handlerCount, numCallQueues,
conf, abortable, AdaptiveLifoCoDelCallQueue.class, maxQueueLength,
codelTargetDelay, codelInterval, codelLifoThreshold,
numGeneralCallsDropped, numLifoModeSwitches);
} else {
- // FifoWFPBQ = FifoWithFastPathBalancedQueueRpcExecutor
- callExecutor = new FifoWithFastPathBalancedQueueRpcExecutor("FifoWFPBQ.default",
+ // FifoWFPBQ = FastPathBalancedQueueRpcExecutor
+ callExecutor = new FastPathBalancedQueueRpcExecutor("FifoWFPBQ.default",
handlerCount, numCallQueues, maxQueueLength, conf, abortable);
}
}
// Create 2 queues to help priorityExecutor be more scalable.
this.priorityExecutor = priorityHandlerCount > 0?
- new FifoWithFastPathBalancedQueueRpcExecutor("FifoWFPBQ.priority", priorityHandlerCount,
+ new FastPathBalancedQueueRpcExecutor("FifoWFPBQ.priority", priorityHandlerCount,
2, maxPriorityQueueLength, conf, abortable): null;
this.replicationExecutor = replicationHandlerCount > 0?
- new FifoWithFastPathBalancedQueueRpcExecutor("FifoWFPBQ.replication",
+ new FastPathBalancedQueueRpcExecutor("FifoWFPBQ.replication",
replicationHandlerCount, 1, maxQueueLength, conf, abortable) : null;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/65cbb6c2/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
index 1fda747..53addf1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
@@ -40,6 +40,7 @@ import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -398,7 +399,8 @@ public class TestSimpleRpcScheduler {
@Override
public long currentTime() {
for (String threadNamePrefix : threadNamePrefixs) {
- if (Thread.currentThread().getName().startsWith(threadNamePrefix)) {
+ String threadName = Thread.currentThread().getName();
+ if (threadName.startsWith(threadNamePrefix)) {
return timeQ.poll().longValue() + offset;
}
}
@@ -409,9 +411,9 @@ public class TestSimpleRpcScheduler {
@Test
public void testCoDelScheduling() throws Exception {
CoDelEnvironmentEdge envEdge = new CoDelEnvironmentEdge();
- envEdge.threadNamePrefixs.add("RW.default");
- envEdge.threadNamePrefixs.add("B.default");
+ envEdge.threadNamePrefixs.add("RpcServer.CodelBQ.default.handler");
Configuration schedConf = HBaseConfiguration.create();
+ schedConf.setInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 250);
schedConf.set(SimpleRpcScheduler.CALL_QUEUE_TYPE_CONF_KEY,
SimpleRpcScheduler.CALL_QUEUE_TYPE_CODEL_CONF_VALUE);
@@ -429,8 +431,7 @@ public class TestSimpleRpcScheduler {
for (int i = 0; i < 100; i++) {
long time = System.currentTimeMillis();
envEdge.timeQ.put(time);
- CallRunner cr = getMockedCallRunner(time);
- Thread.sleep(5);
+ CallRunner cr = getMockedCallRunner(time, 2);
scheduler.dispatch(cr);
}
// make sure fast calls are handled
@@ -439,13 +440,12 @@ public class TestSimpleRpcScheduler {
assertEquals("None of these calls should have been discarded", 0,
scheduler.getNumGeneralCallsDropped());
- envEdge.offset = 6;
+ envEdge.offset = 151;
// calls slower than min delay, but not individually slow enough to be dropped
for (int i = 0; i < 20; i++) {
long time = System.currentTimeMillis();
envEdge.timeQ.put(time);
- CallRunner cr = getMockedCallRunner(time);
- Thread.sleep(6);
+ CallRunner cr = getMockedCallRunner(time, 2);
scheduler.dispatch(cr);
}
@@ -455,35 +455,58 @@ public class TestSimpleRpcScheduler {
assertEquals("None of these calls should have been discarded", 0,
scheduler.getNumGeneralCallsDropped());
- envEdge.offset = 12;
+ envEdge.offset = 2000;
// now slow calls and the ones to be dropped
- for (int i = 0; i < 20; i++) {
+ for (int i = 0; i < 60; i++) {
long time = System.currentTimeMillis();
envEdge.timeQ.put(time);
- CallRunner cr = getMockedCallRunner(time);
- Thread.sleep(12);
+ CallRunner cr = getMockedCallRunner(time, 100);
scheduler.dispatch(cr);
}
// make sure somewhat slow calls are handled
waitUntilQueueEmpty(scheduler);
Thread.sleep(100);
- assertTrue("There should have been at least 12 calls dropped",
- scheduler.getNumGeneralCallsDropped() > 12);
+ assertTrue(
+ "There should have been at least 12 calls dropped however there were "
+ + scheduler.getNumGeneralCallsDropped(),
+ scheduler.getNumGeneralCallsDropped() > 12);
} finally {
scheduler.stop();
}
}
- private CallRunner getMockedCallRunner(long timestamp) throws IOException {
- CallRunner putCallTask = mock(CallRunner.class);
- RpcServer.Call putCall = mock(RpcServer.Call.class);
+ // Get mocked call that has the CallRunner sleep for a while so that the fast
+ // path isn't hit.
+ private CallRunner getMockedCallRunner(long timestamp, final long sleepTime) throws IOException {
+ final RpcServer.Call putCall = mock(RpcServer.Call.class);
+
+ putCall.timestamp = timestamp;
putCall.param = RequestConverter.buildMutateRequest(
- Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
- RPCProtos.RequestHeader putHead = RPCProtos.RequestHeader.newBuilder().setMethodName("mutate").build();
- when(putCallTask.getCall()).thenReturn(putCall);
+ Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
+
+ RPCProtos.RequestHeader putHead = RPCProtos.RequestHeader.newBuilder()
+ .setMethodName("mutate")
+ .build();
+ when(putCall.getSize()).thenReturn(9L);
when(putCall.getHeader()).thenReturn(putHead);
- putCall.timestamp = timestamp;
- return putCallTask;
+
+ CallRunner cr = new CallRunner(null, putCall) {
+ public void run() {
+ try {
+ LOG.warn("Sleeping for " + sleepTime);
+ Thread.sleep(sleepTime);
+ LOG.warn("Done Sleeping for " + sleepTime);
+ } catch (InterruptedException e) {
+ }
+ }
+ public Call getCall() {
+ return putCall;
+ }
+
+ public void drop() {}
+ };
+
+ return cr;
}
}