You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ki...@apache.org on 2017/06/06 13:35:59 UTC
hadoop git commit: HADOOP-14035. Reduce fair call queue backoff's
impact on clients. Contributed by Daryn Sharp.
Repository: hadoop
Updated Branches:
refs/heads/trunk 66c6fd831 -> 855e0477b
HADOOP-14035. Reduce fair call queue backoff's impact on clients. Contributed by Daryn Sharp.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/855e0477
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/855e0477
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/855e0477
Branch: refs/heads/trunk
Commit: 855e0477b1706a2d5b0df6a2b0e461aeec8839c2
Parents: 66c6fd8
Author: Kihwal Lee <ki...@apache.org>
Authored: Tue Jun 6 08:34:33 2017 -0500
Committer: Kihwal Lee <ki...@apache.org>
Committed: Tue Jun 6 08:35:12 2017 -0500
----------------------------------------------------------------------
.../org/apache/hadoop/ipc/CallQueueManager.java | 127 ++++++++++++++-
.../org/apache/hadoop/ipc/FairCallQueue.java | 90 ++++++++---
.../main/java/org/apache/hadoop/ipc/Server.java | 22 ++-
.../apache/hadoop/ipc/TestCallQueueManager.java | 78 +++++++++-
.../apache/hadoop/ipc/TestFairCallQueue.java | 156 +++++++++++++++++++
.../java/org/apache/hadoop/ipc/TestRPC.java | 4 +-
6 files changed, 441 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/855e0477/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
index 50ed353..2764788 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
@@ -18,8 +18,12 @@
package org.apache.hadoop.ipc;
+import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
+import java.util.AbstractQueue;
+import java.util.Collection;
+import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -28,11 +32,15 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
+
+import com.google.common.annotations.VisibleForTesting;
/**
* Abstracts queue operations for different blocking queues.
*/
-public class CallQueueManager<E> {
+public class CallQueueManager<E extends Schedulable>
+ extends AbstractQueue<E> implements BlockingQueue<E> {
public static final Log LOG = LogFactory.getLog(CallQueueManager.class);
// Number of checkpoints for empty queue.
private static final int CHECKPOINT_NUM = 20;
@@ -76,6 +84,15 @@ public class CallQueueManager<E> {
maxQueueSize + " scheduler: " + schedulerClass);
}
+ @VisibleForTesting // only!
+ CallQueueManager(BlockingQueue<E> queue, RpcScheduler scheduler,
+ boolean clientBackOffEnabled) {
+ this.putRef = new AtomicReference<BlockingQueue<E>>(queue);
+ this.takeRef = new AtomicReference<BlockingQueue<E>>(queue);
+ this.scheduler = scheduler;
+ this.clientBackOffEnabled = clientBackOffEnabled;
+ }
+
private static <T extends RpcScheduler> T createScheduler(
Class<T> theClass, int priorityLevels, String ns, Configuration conf) {
// Used for custom, configurable scheduler
@@ -190,12 +207,40 @@ public class CallQueueManager<E> {
}
/**
- * Insert e into the backing queue or block until we can.
+ * Insert e into the backing queue or block until we can. If client
+ * backoff is enabled this method behaves like add which throws if
+ * the queue overflows.
* If we block and the queue changes on us, we will insert while the
* queue is drained.
*/
+ @Override
public void put(E e) throws InterruptedException {
- putRef.get().put(e);
+ if (!isClientBackoffEnabled()) {
+ putRef.get().put(e);
+ } else if (shouldBackOff(e)) {
+ throwBackoff();
+ } else {
+ add(e);
+ }
+ }
+
+ @Override
+ public boolean add(E e) {
+ try {
+ return putRef.get().add(e);
+ } catch (CallQueueOverflowException ex) {
+ // queue provided a custom exception that may control if the client
+ // should be disconnected.
+ throw ex;
+ } catch (IllegalStateException ise) {
+ throwBackoff();
+ }
+ return true;
+ }
+
+ // ideally this behavior should be controllable too.
+ private void throwBackoff() throws IllegalStateException {
+ throw CallQueueOverflowException.DISCONNECT;
}
/**
@@ -203,14 +248,37 @@ public class CallQueueManager<E> {
* Return true if e is queued.
* Return false if the queue is full.
*/
- public boolean offer(E e) throws InterruptedException {
+ @Override
+ public boolean offer(E e) {
return putRef.get().offer(e);
}
+ @Override
+ public boolean offer(E e, long timeout, TimeUnit unit)
+ throws InterruptedException {
+ return putRef.get().offer(e, timeout, unit);
+ }
+
+ @Override
+ public E peek() {
+ return takeRef.get().peek();
+ }
+
+ @Override
+ public E poll() {
+ return takeRef.get().poll();
+ }
+
+ @Override
+ public E poll(long timeout, TimeUnit unit) throws InterruptedException {
+ return takeRef.get().poll(timeout, unit);
+ }
+
/**
* Retrieve an E from the backing queue or block until we can.
* Guaranteed to return an element from the current queue.
*/
+ @Override
public E take() throws InterruptedException {
E e = null;
@@ -221,10 +289,16 @@ public class CallQueueManager<E> {
return e;
}
+ @Override
public int size() {
return takeRef.get().size();
}
+ @Override
+ public int remainingCapacity() {
+ return takeRef.get().remainingCapacity();
+ }
+
/**
* Read the number of levels from the configuration.
* This will affect the FairCallQueue's overall capacity.
@@ -308,4 +382,49 @@ public class CallQueueManager<E> {
private String stringRepr(Object o) {
return o.getClass().getName() + '@' + Integer.toHexString(o.hashCode());
}
+
+ @Override
+ public int drainTo(Collection<? super E> c) {
+ return takeRef.get().drainTo(c);
+ }
+
+ @Override
+ public int drainTo(Collection<? super E> c, int maxElements) {
+ return takeRef.get().drainTo(c, maxElements);
+ }
+
+ @Override
+ public Iterator<E> iterator() {
+ return takeRef.get().iterator();
+ }
+
+ // exception that mimics the standard ISE thrown by blocking queues but
+ // embeds a rpc server exception for the client to retry and indicate
+ // if the client should be disconnected.
+ @SuppressWarnings("serial")
+ static class CallQueueOverflowException extends IllegalStateException {
+ private static String TOO_BUSY = "Server too busy";
+ static final CallQueueOverflowException KEEPALIVE =
+ new CallQueueOverflowException(
+ new RetriableException(TOO_BUSY),
+ RpcStatusProto.ERROR);
+ static final CallQueueOverflowException DISCONNECT =
+ new CallQueueOverflowException(
+ new RetriableException(TOO_BUSY + " - disconnecting"),
+ RpcStatusProto.FATAL);
+
+ CallQueueOverflowException(final IOException ioe,
+ final RpcStatusProto status) {
+ super("Queue full", new RpcServerException(ioe.getMessage(), ioe){
+ @Override
+ public RpcStatusProto getRpcStatusProto() {
+ return status;
+ }
+ });
+ }
+ @Override
+ public IOException getCause() {
+ return (IOException)super.getCause();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/855e0477/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java
index 820f24c..8bcaf05 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java
@@ -36,6 +36,7 @@ import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.CallQueueManager.CallQueueOverflowException;
import org.apache.hadoop.metrics2.util.MBeans;
/**
@@ -134,45 +135,84 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
/* AbstractQueue and BlockingQueue methods */
/**
- * Put and offer follow the same pattern:
+ * Add, put, and offer follow the same pattern:
* 1. Get the assigned priorityLevel from the call by scheduler
* 2. Get the nth sub-queue matching this priorityLevel
* 3. delegate the call to this sub-queue.
*
* But differ in how they handle overflow:
- * - Put will move on to the next queue until it lands on the last queue
+ * - Add will move on to the next queue, throw on last queue overflow
+ * - Put will move on to the next queue, block on last queue overflow
* - Offer does not attempt other queues on overflow
*/
- @Override
- public void put(E e) throws InterruptedException {
- int priorityLevel = e.getPriorityLevel();
- final int numLevels = this.queues.size();
- while (true) {
- BlockingQueue<E> q = this.queues.get(priorityLevel);
- boolean res = q.offer(e);
- if (!res) {
- // Update stats
- this.overflowedCalls.get(priorityLevel).getAndIncrement();
-
- // If we failed to insert, try again on the next level
- priorityLevel++;
-
- if (priorityLevel == numLevels) {
- // That was the last one, we will block on put in the last queue
- // Delete this line to drop the call
- this.queues.get(priorityLevel-1).put(e);
- break;
- }
- } else {
- break;
- }
+ @Override
+ public boolean add(E e) {
+ final int priorityLevel = e.getPriorityLevel();
+ // try offering to all queues.
+ if (!offerQueues(priorityLevel, e, true)) {
+ // only disconnect the lowest priority users that overflow the queue.
+ throw (priorityLevel == queues.size() - 1)
+ ? CallQueueOverflowException.DISCONNECT
+ : CallQueueOverflowException.KEEPALIVE;
}
+ return true;
+ }
+ @Override
+ public void put(E e) throws InterruptedException {
+ final int priorityLevel = e.getPriorityLevel();
+ // try offering to all but last queue, put on last.
+ if (!offerQueues(priorityLevel, e, false)) {
+ putQueue(queues.size() - 1, e);
+ }
+ }
+ /**
+ * Put the element in a queue of a specific priority.
+ * @param priority - queue priority
+ * @param e - element to add
+ */
+ @VisibleForTesting
+ void putQueue(int priority, E e) throws InterruptedException {
+ queues.get(priority).put(e);
signalNotEmpty();
}
+ /**
+ * Offer the element to queue of a specific priority.
+ * @param priority - queue priority
+ * @param e - element to add
+ * @return boolean if added to the given queue
+ */
+ @VisibleForTesting
+ boolean offerQueue(int priority, E e) {
+ boolean ret = queues.get(priority).offer(e);
+ if (ret) {
+ signalNotEmpty();
+ }
+ return ret;
+ }
+
+ /**
+ * Offer the element to queue of the given or lower priority.
+ * @param priority - starting queue priority
+ * @param e - element to add
+ * @param includeLast - whether to attempt last queue
+ * @return boolean if added to a queue
+ */
+ private boolean offerQueues(int priority, E e, boolean includeLast) {
+ int lastPriority = queues.size() - (includeLast ? 1 : 2);
+ for (int i=priority; i <= lastPriority; i++) {
+ if (offerQueue(i, e)) {
+ return true;
+ }
+ // Update stats
+ overflowedCalls.get(i).getAndIncrement();
+ }
+ return false;
+ }
+
@Override
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/855e0477/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index f3b9a82..df108b8 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -83,6 +83,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.ipc.CallQueueManager.CallQueueOverflowException;
import org.apache.hadoop.ipc.RPC.RpcInvoker;
import org.apache.hadoop.ipc.RPC.VersionMismatch;
import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics;
@@ -2479,7 +2480,9 @@ public abstract class Server {
call.setPriorityLevel(callQueue.getPriorityLevel(call));
try {
- queueCall(call);
+ internalQueueCall(call);
+ } catch (RpcServerException rse) {
+ throw rse;
} catch (IOException ioe) {
throw new FatalRpcServerException(
RpcErrorCodeProto.ERROR_RPC_SERVER, ioe);
@@ -2616,9 +2619,19 @@ public abstract class Server {
}
public void queueCall(Call call) throws IOException, InterruptedException {
- if (!callQueue.isClientBackoffEnabled()) {
+ // external non-rpc calls don't need server exception wrapper.
+ try {
+ internalQueueCall(call);
+ } catch (RpcServerException rse) {
+ throw (IOException)rse.getCause();
+ }
+ }
+
+ private void internalQueueCall(Call call)
+ throws IOException, InterruptedException {
+ try {
callQueue.put(call); // queue the call; maybe blocked here
- } else if (callQueue.shouldBackOff(call) || !callQueue.offer(call)) {
+ } catch (CallQueueOverflowException cqe) {
// If rpc scheduler indicates back off based on performance degradation
// such as response time or rpc queue is full, we will ask the client
// to back off by throwing RetriableException. Whether the client will
@@ -2626,7 +2639,8 @@ public abstract class Server {
// For example, IPC clients using FailoverOnNetworkExceptionRetry handle
// RetriableException.
rpcMetrics.incrClientBackoff();
- throw new RetriableException("Server is too busy.");
+ // unwrap retriable exception.
+ throw cqe.getCause();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/855e0477/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java
index 1211657..a5a0b00 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java
@@ -19,8 +19,14 @@
package org.apache.hadoop.ipc;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import java.util.ArrayList;
import java.util.HashMap;
@@ -29,8 +35,10 @@ import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.ipc.CallQueueManager.CallQueueOverflowException;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.Test;
+import org.mockito.Mockito;
public class TestCallQueueManager {
private CallQueueManager<FakeCall> manager;
@@ -311,11 +319,21 @@ public class TestCallQueueManager {
assertEquals(totalCallsConsumed, totalCallsCreated);
}
- public static class ExceptionFakeCall {
+ public static class ExceptionFakeCall implements Schedulable {
public ExceptionFakeCall() {
throw new IllegalArgumentException("Exception caused by call queue " +
"constructor.!!");
}
+
+ @Override
+ public UserGroupInformation getUserGroupInformation() {
+ return null;
+ }
+
+ @Override
+ public int getPriorityLevel() {
+ return 0;
+ }
}
public static class ExceptionFakeScheduler {
@@ -359,4 +377,62 @@ public class TestCallQueueManager {
.getMessage());
}
}
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testCallQueueOverflowExceptions() throws Exception {
+ RpcScheduler scheduler = Mockito.mock(RpcScheduler.class);
+ BlockingQueue<Schedulable> queue = Mockito.mock(BlockingQueue.class);
+ CallQueueManager<Schedulable> cqm =
+ Mockito.spy(new CallQueueManager<>(queue, scheduler, false));
+ Schedulable call = new FakeCall(0);
+
+ // call queue exceptions passed threw as-is
+ doThrow(CallQueueOverflowException.KEEPALIVE).when(queue).add(call);
+ try {
+ cqm.add(call);
+ fail("didn't throw");
+ } catch (CallQueueOverflowException cqe) {
+ assertSame(CallQueueOverflowException.KEEPALIVE, cqe);
+ }
+
+ // standard exception for blocking queue full converted to overflow
+ // exception.
+ doThrow(new IllegalStateException()).when(queue).add(call);
+ try {
+ cqm.add(call);
+ fail("didn't throw");
+ } catch (Exception ex) {
+ assertTrue(ex.toString(), ex instanceof CallQueueOverflowException);
+ }
+
+ // backoff disabled, put is put to queue.
+ reset(queue);
+ cqm.setClientBackoffEnabled(false);
+ cqm.put(call);
+ verify(queue, times(1)).put(call);
+ verify(queue, times(0)).add(call);
+
+ // backoff enabled, put is add to queue.
+ reset(queue);
+ cqm.setClientBackoffEnabled(true);
+ doReturn(Boolean.FALSE).when(cqm).shouldBackOff(call);
+ cqm.put(call);
+ verify(queue, times(0)).put(call);
+ verify(queue, times(1)).add(call);
+ reset(queue);
+
+ // backoff is enabled, put + scheduler backoff = overflow exception.
+ reset(queue);
+ cqm.setClientBackoffEnabled(true);
+ doReturn(Boolean.TRUE).when(cqm).shouldBackOff(call);
+ try {
+ cqm.put(call);
+ fail("didn't fail");
+ } catch (Exception ex) {
+ assertTrue(ex.toString(), ex instanceof CallQueueOverflowException);
+ }
+ verify(queue, times(0)).put(call);
+ verify(queue, times(0)).add(call);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/855e0477/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java
index 901a771..6b1cd29 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java
@@ -18,13 +18,19 @@
package org.apache.hadoop.ipc;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.anyObject;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.times;
import junit.framework.TestCase;
import javax.management.MBeanServer;
import javax.management.ObjectName;
+
+import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -34,7 +40,10 @@ import java.util.List;
import java.util.concurrent.BlockingQueue;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.Test;
+import org.mockito.Mockito;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.CallQueueManager.CallQueueOverflowException;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
public class TestFairCallQueue extends TestCase {
private FairCallQueue<Schedulable> fcq;
@@ -133,6 +142,153 @@ public class TestFairCallQueue extends TestCase {
assertNull(fcq.poll());
}
+ @SuppressWarnings("unchecked") // for mock reset.
+ @Test
+ public void testInsertion() throws Exception {
+ Configuration conf = new Configuration();
+ // 3 queues, 2 slots each.
+ fcq = Mockito.spy(new FairCallQueue<Schedulable>(3, 6, "ns", conf));
+
+ Schedulable p0 = mockCall("a", 0);
+ Schedulable p1 = mockCall("b", 1);
+ Schedulable p2 = mockCall("c", 2);
+
+ // add to first queue.
+ Mockito.reset(fcq);
+ fcq.add(p0);
+ Mockito.verify(fcq, times(1)).offerQueue(0, p0);
+ Mockito.verify(fcq, times(0)).offerQueue(1, p0);
+ Mockito.verify(fcq, times(0)).offerQueue(2, p0);
+ Mockito.reset(fcq);
+ // 0:x- 1:-- 2:--
+
+ // add to second queue.
+ Mockito.reset(fcq);
+ fcq.add(p1);
+ Mockito.verify(fcq, times(0)).offerQueue(0, p1);
+ Mockito.verify(fcq, times(1)).offerQueue(1, p1);
+ Mockito.verify(fcq, times(0)).offerQueue(2, p1);
+ // 0:x- 1:x- 2:--
+
+ // add to first queue.
+ Mockito.reset(fcq);
+ fcq.add(p0);
+ Mockito.verify(fcq, times(1)).offerQueue(0, p0);
+ Mockito.verify(fcq, times(0)).offerQueue(1, p0);
+ Mockito.verify(fcq, times(0)).offerQueue(2, p0);
+ // 0:xx 1:x- 2:--
+
+ // add to first full queue spills over to second.
+ Mockito.reset(fcq);
+ fcq.add(p0);
+ Mockito.verify(fcq, times(1)).offerQueue(0, p0);
+ Mockito.verify(fcq, times(1)).offerQueue(1, p0);
+ Mockito.verify(fcq, times(0)).offerQueue(2, p0);
+ // 0:xx 1:xx 2:--
+
+ // add to second full queue spills over to third.
+ Mockito.reset(fcq);
+ fcq.add(p1);
+ Mockito.verify(fcq, times(0)).offerQueue(0, p1);
+ Mockito.verify(fcq, times(1)).offerQueue(1, p1);
+ Mockito.verify(fcq, times(1)).offerQueue(2, p1);
+ // 0:xx 1:xx 2:x-
+
+ // add to first and second full queue spills over to third.
+ Mockito.reset(fcq);
+ fcq.add(p0);
+ Mockito.verify(fcq, times(1)).offerQueue(0, p0);
+ Mockito.verify(fcq, times(1)).offerQueue(1, p0);
+ Mockito.verify(fcq, times(1)).offerQueue(2, p0);
+ // 0:xx 1:xx 2:xx
+
+ // adding non-lowest priority with all queues full throws a
+ // non-disconnecting rpc server exception.
+ Mockito.reset(fcq);
+ try {
+ fcq.add(p0);
+ fail("didn't fail");
+ } catch (IllegalStateException ise) {
+ checkOverflowException(ise, RpcStatusProto.ERROR);
+ }
+ Mockito.verify(fcq, times(1)).offerQueue(0, p0);
+ Mockito.verify(fcq, times(1)).offerQueue(1, p0);
+ Mockito.verify(fcq, times(1)).offerQueue(2, p0);
+
+ // adding non-lowest priority with all queues full throws a
+ // non-disconnecting rpc server exception.
+ Mockito.reset(fcq);
+ try {
+ fcq.add(p1);
+ fail("didn't fail");
+ } catch (IllegalStateException ise) {
+ checkOverflowException(ise, RpcStatusProto.ERROR);
+ }
+ Mockito.verify(fcq, times(0)).offerQueue(0, p1);
+ Mockito.verify(fcq, times(1)).offerQueue(1, p1);
+ Mockito.verify(fcq, times(1)).offerQueue(2, p1);
+
+ // adding lowest priority with all queues full throws a
+ // fatal disconnecting rpc server exception.
+ Mockito.reset(fcq);
+ try {
+ fcq.add(p2);
+ fail("didn't fail");
+ } catch (IllegalStateException ise) {
+ checkOverflowException(ise, RpcStatusProto.FATAL);
+ }
+ Mockito.verify(fcq, times(0)).offerQueue(0, p2);
+ Mockito.verify(fcq, times(0)).offerQueue(1, p2);
+ Mockito.verify(fcq, times(1)).offerQueue(2, p2);
+ Mockito.reset(fcq);
+
+ // used to abort what would be a blocking operation.
+ Exception stopPuts = new RuntimeException();
+
+ // put should offer to all but last subqueue, only put to last subqueue.
+ Mockito.reset(fcq);
+ try {
+ doThrow(stopPuts).when(fcq).putQueue(anyInt(), anyObject());
+ fcq.put(p0);
+ fail("didn't fail");
+ } catch (Exception e) {
+ assertSame(stopPuts, e);
+ }
+ Mockito.verify(fcq, times(1)).offerQueue(0, p0);
+ Mockito.verify(fcq, times(1)).offerQueue(1, p0);
+ Mockito.verify(fcq, times(0)).offerQueue(2, p0); // expect put, not offer.
+ Mockito.verify(fcq, times(1)).putQueue(2, p0);
+
+ // put with lowest priority should not offer, just put.
+ Mockito.reset(fcq);
+ try {
+ doThrow(stopPuts).when(fcq).putQueue(anyInt(), anyObject());
+ fcq.put(p2);
+ fail("didn't fail");
+ } catch (Exception e) {
+ assertSame(stopPuts, e);
+ }
+ Mockito.verify(fcq, times(0)).offerQueue(0, p2);
+ Mockito.verify(fcq, times(0)).offerQueue(1, p2);
+ Mockito.verify(fcq, times(0)).offerQueue(2, p2);
+ Mockito.verify(fcq, times(1)).putQueue(2, p2);
+ }
+
+ private void checkOverflowException(Exception ex, RpcStatusProto status) {
+ // should be an overflow exception
+ assertTrue(ex.getClass().getName() + " != CallQueueOverflowException",
+ ex instanceof CallQueueOverflowException);
+ IOException ioe = ((CallQueueOverflowException)ex).getCause();
+ assertNotNull(ioe);
+ assertTrue(ioe.getClass().getName() + " != RpcServerException",
+ ioe instanceof RpcServerException);
+ RpcServerException rse = (RpcServerException)ioe;
+ // check error/fatal status and if it embeds a retriable ex.
+ assertEquals(status, rse.getRpcStatusProto());
+ assertTrue(rse.getClass().getName() + " != RetriableException",
+ rse.getCause() instanceof RetriableException);
+ }
+
//
// Ensure that FairCallQueue properly implements BlockingQueue
//
http://git-wip-us.apache.org/repos/asf/hadoop/blob/855e0477/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
index 3cc9916..166b205 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
@@ -1123,7 +1123,7 @@ public class TestRPC extends TestRpcBase {
return null;
}
}));
- verify(spy, timeout(500).times(i + 1)).offer(Mockito.<Call>anyObject());
+ verify(spy, timeout(500).times(i + 1)).add(Mockito.<Call>anyObject());
}
try {
proxy.sleep(null, newSleepRequest(100));
@@ -1194,7 +1194,7 @@ public class TestRPC extends TestRpcBase {
return null;
}
}));
- verify(spy, timeout(500).times(i + 1)).offer(Mockito.<Call>anyObject());
+ verify(spy, timeout(500).times(i + 1)).add(Mockito.<Call>anyObject());
}
// Start another sleep RPC call and verify the call is backed off due to
// avg response time(3s) exceeds threshold (2s).
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org