You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2017/06/08 20:06:18 UTC

[36/50] [abbrv] hadoop git commit: HADOOP-14035. Reduce fair call queue backoff's impact on clients. Contributed by Daryn Sharp.

HADOOP-14035. Reduce fair call queue backoff's impact on clients. Contributed by Daryn Sharp.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fd77c7f7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fd77c7f7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fd77c7f7

Branch: refs/heads/HDFS-7240
Commit: fd77c7f76bcadfb10f789a95700fd50972a2f292
Parents: 4c06897
Author: Kihwal Lee <ki...@apache.org>
Authored: Tue Jun 6 08:34:33 2017 -0500
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Thu Jun 8 10:44:51 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/ipc/CallQueueManager.java | 127 ++++++++++++++-
 .../org/apache/hadoop/ipc/FairCallQueue.java    |  90 ++++++++---
 .../main/java/org/apache/hadoop/ipc/Server.java |  22 ++-
 .../apache/hadoop/ipc/TestCallQueueManager.java |  78 +++++++++-
 .../apache/hadoop/ipc/TestFairCallQueue.java    | 156 +++++++++++++++++++
 .../java/org/apache/hadoop/ipc/TestRPC.java     |   4 +-
 6 files changed, 441 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd77c7f7/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
index 50ed353..2764788 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
@@ -18,8 +18,12 @@
 
 package org.apache.hadoop.ipc;
 
+import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
+import java.util.AbstractQueue;
+import java.util.Collection;
+import java.util.Iterator;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
@@ -28,11 +32,15 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
+
+import com.google.common.annotations.VisibleForTesting;
 
 /**
  * Abstracts queue operations for different blocking queues.
  */
-public class CallQueueManager<E> {
+public class CallQueueManager<E extends Schedulable>
+    extends AbstractQueue<E> implements BlockingQueue<E> {
   public static final Log LOG = LogFactory.getLog(CallQueueManager.class);
   // Number of checkpoints for empty queue.
   private static final int CHECKPOINT_NUM = 20;
@@ -76,6 +84,15 @@ public class CallQueueManager<E> {
         maxQueueSize + " scheduler: " + schedulerClass);
   }
 
+  @VisibleForTesting // only!
+  CallQueueManager(BlockingQueue<E> queue, RpcScheduler scheduler,
+      boolean clientBackOffEnabled) {
+    this.putRef = new AtomicReference<BlockingQueue<E>>(queue);
+    this.takeRef = new AtomicReference<BlockingQueue<E>>(queue);
+    this.scheduler = scheduler;
+    this.clientBackOffEnabled = clientBackOffEnabled;
+  }
+
   private static <T extends RpcScheduler> T createScheduler(
       Class<T> theClass, int priorityLevels, String ns, Configuration conf) {
     // Used for custom, configurable scheduler
@@ -190,12 +207,40 @@ public class CallQueueManager<E> {
   }
 
   /**
-   * Insert e into the backing queue or block until we can.
+   * Insert e into the backing queue or block until we can.  If client
+   * backoff is enabled this method behaves like add which throws if
+   * the queue overflows.
    * If we block and the queue changes on us, we will insert while the
    * queue is drained.
    */
+  @Override
   public void put(E e) throws InterruptedException {
-    putRef.get().put(e);
+    if (!isClientBackoffEnabled()) {
+      putRef.get().put(e);
+    } else if (shouldBackOff(e)) {
+      throwBackoff();
+    } else {
+      add(e);
+    }
+  }
+
+  @Override
+  public boolean add(E e) {
+    try {
+      return putRef.get().add(e);
+    } catch (CallQueueOverflowException ex) {
+      // queue provided a custom exception that may control if the client
+      // should be disconnected.
+      throw ex;
+    } catch (IllegalStateException ise) {
+      throwBackoff();
+    }
+    return true;
+  }
+
+  // ideally this behavior should be controllable too.
+  private void throwBackoff() throws IllegalStateException {
+    throw CallQueueOverflowException.DISCONNECT;
   }
 
   /**
@@ -203,14 +248,37 @@ public class CallQueueManager<E> {
    * Return true if e is queued.
    * Return false if the queue is full.
    */
-  public boolean offer(E e) throws InterruptedException {
+  @Override
+  public boolean offer(E e) {
     return putRef.get().offer(e);
   }
 
+  @Override
+  public boolean offer(E e, long timeout, TimeUnit unit)
+      throws InterruptedException {
+    return putRef.get().offer(e, timeout, unit);
+  }
+
+  @Override
+  public E peek() {
+    return takeRef.get().peek();
+  }
+
+  @Override
+  public E poll() {
+    return takeRef.get().poll();
+  }
+
+  @Override
+  public E poll(long timeout, TimeUnit unit) throws InterruptedException {
+    return takeRef.get().poll(timeout, unit);
+  }
+
   /**
    * Retrieve an E from the backing queue or block until we can.
    * Guaranteed to return an element from the current queue.
    */
+  @Override
   public E take() throws InterruptedException {
     E e = null;
 
@@ -221,10 +289,16 @@ public class CallQueueManager<E> {
     return e;
   }
 
+  @Override
   public int size() {
     return takeRef.get().size();
   }
 
+  @Override
+  public int remainingCapacity() {
+    return takeRef.get().remainingCapacity();
+  }
+
   /**
    * Read the number of levels from the configuration.
    * This will affect the FairCallQueue's overall capacity.
@@ -308,4 +382,49 @@ public class CallQueueManager<E> {
   private String stringRepr(Object o) {
     return o.getClass().getName() + '@' + Integer.toHexString(o.hashCode());
   }
+
+  @Override
+  public int drainTo(Collection<? super E> c) {
+    return takeRef.get().drainTo(c);
+  }
+
+  @Override
+  public int drainTo(Collection<? super E> c, int maxElements) {
+    return takeRef.get().drainTo(c, maxElements);
+  }
+
+  @Override
+  public Iterator<E> iterator() {
+    return takeRef.get().iterator();
+  }
+
+  // exception that mimics the standard ISE thrown by blocking queues but
+  // embeds a rpc server exception for the client to retry and indicate
+  // if the client should be disconnected.
+  @SuppressWarnings("serial")
+  static class CallQueueOverflowException extends IllegalStateException {
+    private static String TOO_BUSY = "Server too busy";
+    static final CallQueueOverflowException KEEPALIVE =
+        new CallQueueOverflowException(
+            new RetriableException(TOO_BUSY),
+            RpcStatusProto.ERROR);
+    static final CallQueueOverflowException DISCONNECT =
+        new CallQueueOverflowException(
+            new RetriableException(TOO_BUSY + " - disconnecting"),
+            RpcStatusProto.FATAL);
+
+    CallQueueOverflowException(final IOException ioe,
+        final RpcStatusProto status) {
+      super("Queue full", new RpcServerException(ioe.getMessage(), ioe){
+        @Override
+        public RpcStatusProto getRpcStatusProto() {
+          return status;
+        }
+      });
+    }
+    @Override
+    public IOException getCause() {
+      return (IOException)super.getCause();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd77c7f7/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java
index 820f24c..8bcaf05 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java
@@ -36,6 +36,7 @@ import org.apache.commons.lang.NotImplementedException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.CallQueueManager.CallQueueOverflowException;
 import org.apache.hadoop.metrics2.util.MBeans;
 
 /**
@@ -134,45 +135,84 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
   /* AbstractQueue and BlockingQueue methods */
 
   /**
-   * Put and offer follow the same pattern:
+   * Add, put, and offer follow the same pattern:
    * 1. Get the assigned priorityLevel from the call by scheduler
    * 2. Get the nth sub-queue matching this priorityLevel
    * 3. delegate the call to this sub-queue.
    *
    * But differ in how they handle overflow:
-   * - Put will move on to the next queue until it lands on the last queue
+   * - Add will move on to the next queue, throw on last queue overflow
+   * - Put will move on to the next queue, block on last queue overflow
    * - Offer does not attempt other queues on overflow
    */
-  @Override
-  public void put(E e) throws InterruptedException {
-    int priorityLevel = e.getPriorityLevel();
 
-    final int numLevels = this.queues.size();
-    while (true) {
-      BlockingQueue<E> q = this.queues.get(priorityLevel);
-      boolean res = q.offer(e);
-      if (!res) {
-        // Update stats
-        this.overflowedCalls.get(priorityLevel).getAndIncrement();
-
-        // If we failed to insert, try again on the next level
-        priorityLevel++;
-
-        if (priorityLevel == numLevels) {
-          // That was the last one, we will block on put in the last queue
-          // Delete this line to drop the call
-          this.queues.get(priorityLevel-1).put(e);
-          break;
-        }
-      } else {
-        break;
-      }
+  @Override
+  public boolean add(E e) {
+    final int priorityLevel = e.getPriorityLevel();
+    // try offering to all queues.
+    if (!offerQueues(priorityLevel, e, true)) {
+      // only disconnect the lowest priority users that overflow the queue.
+      throw (priorityLevel == queues.size() - 1)
+          ? CallQueueOverflowException.DISCONNECT
+          : CallQueueOverflowException.KEEPALIVE;
     }
+    return true;
+  }
 
+  @Override
+  public void put(E e) throws InterruptedException {
+    final int priorityLevel = e.getPriorityLevel();
+    // try offering to all but last queue, put on last.
+    if (!offerQueues(priorityLevel, e, false)) {
+      putQueue(queues.size() - 1, e);
+    }
+  }
 
+  /**
+   * Put the element in a queue of a specific priority.
+   * @param priority - queue priority
+   * @param e - element to add
+   */
+  @VisibleForTesting
+  void putQueue(int priority, E e) throws InterruptedException {
+    queues.get(priority).put(e);
     signalNotEmpty();
   }
 
+  /**
+   * Offer the element to queue of a specific priority.
+   * @param priority - queue priority
+   * @param e - element to add
+   * @return boolean if added to the given queue
+   */
+  @VisibleForTesting
+  boolean offerQueue(int priority, E e) {
+    boolean ret = queues.get(priority).offer(e);
+    if (ret) {
+      signalNotEmpty();
+    }
+    return ret;
+  }
+
+  /**
+   * Offer the element to queue of the given or lower priority.
+   * @param priority - starting queue priority
+   * @param e - element to add
+   * @param includeLast - whether to attempt last queue
+   * @return boolean if added to a queue
+   */
+  private boolean offerQueues(int priority, E e, boolean includeLast) {
+    int lastPriority = queues.size() - (includeLast ? 1 : 2);
+    for (int i=priority; i <= lastPriority; i++) {
+      if (offerQueue(i, e)) {
+        return true;
+      }
+      // Update stats
+      overflowedCalls.get(i).getAndIncrement();
+    }
+    return false;
+  }
+
   @Override
   public boolean offer(E e, long timeout, TimeUnit unit)
       throws InterruptedException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd77c7f7/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index f3b9a82..df108b8 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -83,6 +83,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.ipc.CallQueueManager.CallQueueOverflowException;
 import org.apache.hadoop.ipc.RPC.RpcInvoker;
 import org.apache.hadoop.ipc.RPC.VersionMismatch;
 import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics;
@@ -2479,7 +2480,9 @@ public abstract class Server {
       call.setPriorityLevel(callQueue.getPriorityLevel(call));
 
       try {
-        queueCall(call);
+        internalQueueCall(call);
+      } catch (RpcServerException rse) {
+        throw rse;
       } catch (IOException ioe) {
         throw new FatalRpcServerException(
             RpcErrorCodeProto.ERROR_RPC_SERVER, ioe);
@@ -2616,9 +2619,19 @@ public abstract class Server {
   }
 
   public void queueCall(Call call) throws IOException, InterruptedException {
-    if (!callQueue.isClientBackoffEnabled()) {
+    // external non-rpc calls don't need server exception wrapper.
+    try {
+      internalQueueCall(call);
+    } catch (RpcServerException rse) {
+      throw (IOException)rse.getCause();
+    }
+  }
+
+  private void internalQueueCall(Call call)
+      throws IOException, InterruptedException {
+    try {
       callQueue.put(call); // queue the call; maybe blocked here
-    } else if (callQueue.shouldBackOff(call) || !callQueue.offer(call)) {
+    } catch (CallQueueOverflowException cqe) {
       // If rpc scheduler indicates back off based on performance degradation
       // such as response time or rpc queue is full, we will ask the client
       // to back off by throwing RetriableException. Whether the client will
@@ -2626,7 +2639,8 @@ public abstract class Server {
       // For example, IPC clients using FailoverOnNetworkExceptionRetry handle
       // RetriableException.
       rpcMetrics.incrClientBackoff();
-      throw new RetriableException("Server is too busy.");
+      // unwrap retriable exception.
+      throw cqe.getCause();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd77c7f7/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java
index 1211657..a5a0b00 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java
@@ -19,8 +19,14 @@
 package org.apache.hadoop.ipc;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -29,8 +35,10 @@ import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.ipc.CallQueueManager.CallQueueOverflowException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 public class TestCallQueueManager {
   private CallQueueManager<FakeCall> manager;
@@ -311,11 +319,21 @@ public class TestCallQueueManager {
     assertEquals(totalCallsConsumed, totalCallsCreated);
   }
 
-  public static class ExceptionFakeCall {
+  public static class ExceptionFakeCall implements Schedulable {
     public ExceptionFakeCall() {
       throw new IllegalArgumentException("Exception caused by call queue " +
           "constructor.!!");
     }
+
+    @Override
+    public UserGroupInformation getUserGroupInformation() {
+      return null;
+    }
+
+    @Override
+    public int getPriorityLevel() {
+      return 0;
+    }
   }
 
   public static class ExceptionFakeScheduler {
@@ -359,4 +377,62 @@ public class TestCallQueueManager {
           .getMessage());
     }
   }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testCallQueueOverflowExceptions() throws Exception {
+    RpcScheduler scheduler = Mockito.mock(RpcScheduler.class);
+    BlockingQueue<Schedulable> queue = Mockito.mock(BlockingQueue.class);
+    CallQueueManager<Schedulable> cqm =
+        Mockito.spy(new CallQueueManager<>(queue, scheduler, false));
+    Schedulable call = new FakeCall(0);
+
+    // call queue exceptions passed threw as-is
+    doThrow(CallQueueOverflowException.KEEPALIVE).when(queue).add(call);
+    try {
+      cqm.add(call);
+      fail("didn't throw");
+    } catch (CallQueueOverflowException cqe) {
+      assertSame(CallQueueOverflowException.KEEPALIVE, cqe);
+    }
+
+    // standard exception for blocking queue full converted to overflow
+    // exception.
+    doThrow(new IllegalStateException()).when(queue).add(call);
+    try {
+      cqm.add(call);
+      fail("didn't throw");
+    } catch (Exception ex) {
+      assertTrue(ex.toString(), ex instanceof CallQueueOverflowException);
+    }
+
+    // backoff disabled, put is put to queue.
+    reset(queue);
+    cqm.setClientBackoffEnabled(false);
+    cqm.put(call);
+    verify(queue, times(1)).put(call);
+    verify(queue, times(0)).add(call);
+
+    // backoff enabled, put is add to queue.
+    reset(queue);
+    cqm.setClientBackoffEnabled(true);
+    doReturn(Boolean.FALSE).when(cqm).shouldBackOff(call);
+    cqm.put(call);
+    verify(queue, times(0)).put(call);
+    verify(queue, times(1)).add(call);
+    reset(queue);
+
+    // backoff is enabled, put + scheduler backoff = overflow exception.
+    reset(queue);
+    cqm.setClientBackoffEnabled(true);
+    doReturn(Boolean.TRUE).when(cqm).shouldBackOff(call);
+    try {
+      cqm.put(call);
+      fail("didn't fail");
+    } catch (Exception ex) {
+      assertTrue(ex.toString(), ex instanceof CallQueueOverflowException);
+    }
+    verify(queue, times(0)).put(call);
+    verify(queue, times(0)).add(call);
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd77c7f7/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java
index 901a771..6b1cd29 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java
@@ -18,13 +18,19 @@
 
 package org.apache.hadoop.ipc;
 
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.anyObject;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.times;
 
 import junit.framework.TestCase;
 
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
+
+import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -34,7 +40,10 @@ import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.Test;
+import org.mockito.Mockito;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.CallQueueManager.CallQueueOverflowException;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
 
 public class TestFairCallQueue extends TestCase {
   private FairCallQueue<Schedulable> fcq;
@@ -133,6 +142,153 @@ public class TestFairCallQueue extends TestCase {
     assertNull(fcq.poll());
   }
 
+  @SuppressWarnings("unchecked") // for mock reset.
+  @Test
+  public void testInsertion() throws Exception {
+    Configuration conf = new Configuration();
+    // 3 queues, 2 slots each.
+    fcq = Mockito.spy(new FairCallQueue<Schedulable>(3, 6, "ns", conf));
+
+    Schedulable p0 = mockCall("a", 0);
+    Schedulable p1 = mockCall("b", 1);
+    Schedulable p2 = mockCall("c", 2);
+
+    // add to first queue.
+    Mockito.reset(fcq);
+    fcq.add(p0);
+    Mockito.verify(fcq, times(1)).offerQueue(0, p0);
+    Mockito.verify(fcq, times(0)).offerQueue(1, p0);
+    Mockito.verify(fcq, times(0)).offerQueue(2, p0);
+    Mockito.reset(fcq);
+    // 0:x- 1:-- 2:--
+
+    // add to second queue.
+    Mockito.reset(fcq);
+    fcq.add(p1);
+    Mockito.verify(fcq, times(0)).offerQueue(0, p1);
+    Mockito.verify(fcq, times(1)).offerQueue(1, p1);
+    Mockito.verify(fcq, times(0)).offerQueue(2, p1);
+    // 0:x- 1:x- 2:--
+
+    // add to first queue.
+    Mockito.reset(fcq);
+    fcq.add(p0);
+    Mockito.verify(fcq, times(1)).offerQueue(0, p0);
+    Mockito.verify(fcq, times(0)).offerQueue(1, p0);
+    Mockito.verify(fcq, times(0)).offerQueue(2, p0);
+    // 0:xx 1:x- 2:--
+
+    // add to first full queue spills over to second.
+    Mockito.reset(fcq);
+    fcq.add(p0);
+    Mockito.verify(fcq, times(1)).offerQueue(0, p0);
+    Mockito.verify(fcq, times(1)).offerQueue(1, p0);
+    Mockito.verify(fcq, times(0)).offerQueue(2, p0);
+    // 0:xx 1:xx 2:--
+
+    // add to second full queue spills over to third.
+    Mockito.reset(fcq);
+    fcq.add(p1);
+    Mockito.verify(fcq, times(0)).offerQueue(0, p1);
+    Mockito.verify(fcq, times(1)).offerQueue(1, p1);
+    Mockito.verify(fcq, times(1)).offerQueue(2, p1);
+    // 0:xx 1:xx 2:x-
+
+    // add to first and second full queue spills over to third.
+    Mockito.reset(fcq);
+    fcq.add(p0);
+    Mockito.verify(fcq, times(1)).offerQueue(0, p0);
+    Mockito.verify(fcq, times(1)).offerQueue(1, p0);
+    Mockito.verify(fcq, times(1)).offerQueue(2, p0);
+    // 0:xx 1:xx 2:xx
+
+    // adding non-lowest priority with all queues full throws a
+    // non-disconnecting rpc server exception.
+    Mockito.reset(fcq);
+    try {
+      fcq.add(p0);
+      fail("didn't fail");
+    } catch (IllegalStateException ise) {
+      checkOverflowException(ise, RpcStatusProto.ERROR);
+    }
+    Mockito.verify(fcq, times(1)).offerQueue(0, p0);
+    Mockito.verify(fcq, times(1)).offerQueue(1, p0);
+    Mockito.verify(fcq, times(1)).offerQueue(2, p0);
+
+    // adding non-lowest priority with all queues full throws a
+    // non-disconnecting rpc server exception.
+    Mockito.reset(fcq);
+    try {
+      fcq.add(p1);
+      fail("didn't fail");
+    } catch (IllegalStateException ise) {
+      checkOverflowException(ise, RpcStatusProto.ERROR);
+    }
+    Mockito.verify(fcq, times(0)).offerQueue(0, p1);
+    Mockito.verify(fcq, times(1)).offerQueue(1, p1);
+    Mockito.verify(fcq, times(1)).offerQueue(2, p1);
+
+    // adding lowest priority with all queues full throws a
+    // fatal disconnecting rpc server exception.
+    Mockito.reset(fcq);
+    try {
+      fcq.add(p2);
+      fail("didn't fail");
+    } catch (IllegalStateException ise) {
+      checkOverflowException(ise, RpcStatusProto.FATAL);
+    }
+    Mockito.verify(fcq, times(0)).offerQueue(0, p2);
+    Mockito.verify(fcq, times(0)).offerQueue(1, p2);
+    Mockito.verify(fcq, times(1)).offerQueue(2, p2);
+    Mockito.reset(fcq);
+
+    // used to abort what would be a blocking operation.
+    Exception stopPuts = new RuntimeException();
+
+    // put should offer to all but last subqueue, only put to last subqueue.
+    Mockito.reset(fcq);
+    try {
+      doThrow(stopPuts).when(fcq).putQueue(anyInt(), anyObject());
+      fcq.put(p0);
+      fail("didn't fail");
+    } catch (Exception e) {
+      assertSame(stopPuts, e);
+    }
+    Mockito.verify(fcq, times(1)).offerQueue(0, p0);
+    Mockito.verify(fcq, times(1)).offerQueue(1, p0);
+    Mockito.verify(fcq, times(0)).offerQueue(2, p0); // expect put, not offer.
+    Mockito.verify(fcq, times(1)).putQueue(2, p0);
+
+    // put with lowest priority should not offer, just put.
+    Mockito.reset(fcq);
+    try {
+      doThrow(stopPuts).when(fcq).putQueue(anyInt(), anyObject());
+      fcq.put(p2);
+      fail("didn't fail");
+    } catch (Exception e) {
+      assertSame(stopPuts, e);
+    }
+    Mockito.verify(fcq, times(0)).offerQueue(0, p2);
+    Mockito.verify(fcq, times(0)).offerQueue(1, p2);
+    Mockito.verify(fcq, times(0)).offerQueue(2, p2);
+    Mockito.verify(fcq, times(1)).putQueue(2, p2);
+  }
+
+  private void checkOverflowException(Exception ex, RpcStatusProto status) {
+    // should be an overflow exception
+    assertTrue(ex.getClass().getName() + " != CallQueueOverflowException",
+        ex instanceof CallQueueOverflowException);
+    IOException ioe = ((CallQueueOverflowException)ex).getCause();
+    assertNotNull(ioe);
+    assertTrue(ioe.getClass().getName() + " != RpcServerException",
+        ioe instanceof RpcServerException);
+    RpcServerException rse = (RpcServerException)ioe;
+    // check error/fatal status and if it embeds a retriable ex.
+    assertEquals(status, rse.getRpcStatusProto());
+    assertTrue(rse.getClass().getName() + " != RetriableException",
+        rse.getCause() instanceof RetriableException);
+  }
+
   //
   // Ensure that FairCallQueue properly implements BlockingQueue
   //

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd77c7f7/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
index 3cc9916..166b205 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
@@ -1123,7 +1123,7 @@ public class TestRPC extends TestRpcBase {
                 return null;
               }
             }));
-        verify(spy, timeout(500).times(i + 1)).offer(Mockito.<Call>anyObject());
+        verify(spy, timeout(500).times(i + 1)).add(Mockito.<Call>anyObject());
       }
       try {
         proxy.sleep(null, newSleepRequest(100));
@@ -1194,7 +1194,7 @@ public class TestRPC extends TestRpcBase {
                 return null;
               }
             }));
-        verify(spy, timeout(500).times(i + 1)).offer(Mockito.<Call>anyObject());
+        verify(spy, timeout(500).times(i + 1)).add(Mockito.<Call>anyObject());
       }
       // Start another sleep RPC call and verify the call is backed off due to
       // avg response time(3s) exceeds threshold (2s).


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org