You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2016/06/25 07:53:38 UTC

[2/2] hadoop git commit: HADOOP-13227. AsyncCallHandler should use an event driven architecture to handle async calls.

HADOOP-13227. AsyncCallHandler should use an event driven architecture to handle async calls.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ce788c20
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ce788c20
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ce788c20

Branch: refs/heads/branch-2.8
Commit: ce788c207fe86ba17a0f1da971cad01ce3e15a7b
Parents: e251654
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Sat Jun 25 15:45:16 2016 +0800
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Sat Jun 25 15:51:45 2016 +0800

----------------------------------------------------------------------
 .../hadoop/io/retry/AsyncCallHandler.java       | 104 ++++++----
 .../org/apache/hadoop/io/retry/CallReturn.java  |   3 +
 .../hadoop/io/retry/RetryInvocationHandler.java | 194 ++++++++++++-------
 .../org/apache/hadoop/io/retry/RetryPolicy.java |   1 +
 .../main/java/org/apache/hadoop/ipc/Client.java |  15 +-
 .../hadoop/io/retry/TestDefaultRetryPolicy.java |  14 +-
 .../org/apache/hadoop/ipc/TestAsyncIPC.java     |   4 +-
 .../java/org/apache/hadoop/ipc/TestIPC.java     |   6 +-
 8 files changed, 224 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce788c20/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/AsyncCallHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/AsyncCallHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/AsyncCallHandler.java
index 5a03b03..69e1233 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/AsyncCallHandler.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/AsyncCallHandler.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.io.retry;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -27,17 +28,21 @@ import org.apache.hadoop.util.concurrent.AsyncGet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.InterruptedIOException;
 import java.lang.reflect.Method;
-import java.util.LinkedList;
+import java.util.Iterator;
 import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 /** Handle async calls. */
 @InterfaceAudience.Private
 public class AsyncCallHandler {
-  static final Logger LOG = LoggerFactory.getLogger(AsyncCallHandler.class);
+  public static final Logger LOG = LoggerFactory.getLogger(
+      AsyncCallHandler.class);
 
   private static final ThreadLocal<AsyncGet<?, Exception>>
       LOWER_LAYER_ASYNC_RETURN = new ThreadLocal<>();
@@ -73,35 +78,34 @@ public class AsyncCallHandler {
 
   /** A simple concurrent queue which keeping track the empty start time. */
   static class ConcurrentQueue<T> {
-    private final Queue<T> queue = new LinkedList<>();
-    private long emptyStartTime = Time.monotonicNow();
+    private final Queue<T> queue = new ConcurrentLinkedQueue<>();
+    private final AtomicLong emptyStartTime
+        = new AtomicLong(Time.monotonicNow());
 
-    synchronized int size() {
-      return queue.size();
+    Iterator<T> iterator() {
+      return queue.iterator();
     }
 
     /** Is the queue empty for more than the given time in millisecond? */
-    synchronized boolean isEmpty(long time) {
-      return queue.isEmpty() && Time.monotonicNow() - emptyStartTime > time;
+    boolean isEmpty(long time) {
+      return Time.monotonicNow() - emptyStartTime.get() > time
+          && queue.isEmpty();
     }
 
-    synchronized void offer(T c) {
+    void offer(T c) {
       final boolean added = queue.offer(c);
       Preconditions.checkState(added);
     }
 
-    synchronized T poll() {
-      Preconditions.checkState(!queue.isEmpty());
-      final T t = queue.poll();
+    void checkEmpty() {
       if (queue.isEmpty()) {
-        emptyStartTime = Time.monotonicNow();
+        emptyStartTime.set(Time.monotonicNow());
       }
-      return t;
     }
   }
 
   /** A queue for handling async calls. */
-  static class AsyncCallQueue {
+  class AsyncCallQueue {
     private final ConcurrentQueue<AsyncCall> queue = new ConcurrentQueue<>();
     private final Processor processor = new Processor();
 
@@ -113,20 +117,29 @@ public class AsyncCallHandler {
       processor.tryStart();
     }
 
-    void checkCalls() {
-      final int size = queue.size();
-      for (int i = 0; i < size; i++) {
-        final AsyncCall c = queue.poll();
-        if (!c.isDone()) {
-          queue.offer(c); // the call is not done yet, add it back.
+    long checkCalls() {
+      final long startTime = Time.monotonicNow();
+      long minWaitTime = Processor.MAX_WAIT_PERIOD;
+
+      for (final Iterator<AsyncCall> i = queue.iterator(); i.hasNext();) {
+        final AsyncCall c = i.next();
+        if (c.isDone()) {
+          i.remove(); // the call is done, remove it from the queue.
+          queue.checkEmpty();
+        } else {
+          final Long waitTime = c.getWaitTime(startTime);
+          if (waitTime != null && waitTime > 0 && waitTime < minWaitTime) {
+            minWaitTime = waitTime;
+          }
         }
       }
+      return minWaitTime;
     }
 
     /** Process the async calls in the queue. */
     private class Processor {
-      static final long GRACE_PERIOD = 10*1000L;
-      static final long SLEEP_PERIOD = 100L;
+      static final long GRACE_PERIOD = 3*1000L;
+      static final long MAX_WAIT_PERIOD = 100L;
 
       private final AtomicReference<Thread> running = new AtomicReference<>();
 
@@ -141,15 +154,16 @@ public class AsyncCallHandler {
             @Override
             public void run() {
               for (; isRunning(this);) {
+                final long waitTime = checkCalls();
+                tryStop(this);
+
                 try {
-                  Thread.sleep(SLEEP_PERIOD);
+                  synchronized (AsyncCallHandler.this) {
+                    AsyncCallHandler.this.wait(waitTime);
+                  }
                 } catch (InterruptedException e) {
                   kill(this);
-                  return;
                 }
-
-                checkCalls();
-                tryStop(this);
               }
             }
           };
@@ -215,10 +229,9 @@ public class AsyncCallHandler {
     private AsyncGet<?, Exception> lowerLayerAsyncGet;
 
     AsyncCall(Method method, Object[] args, boolean isRpc, int callId,
-              RetryInvocationHandler.Counters counters,
               RetryInvocationHandler<?> retryInvocationHandler,
               AsyncCallHandler asyncCallHandler) {
-      super(method, args, isRpc, callId, counters, retryInvocationHandler);
+      super(method, args, isRpc, callId, retryInvocationHandler);
 
       this.asyncCallHandler = asyncCallHandler;
     }
@@ -226,6 +239,7 @@ public class AsyncCallHandler {
     /** @return true if the call is done; otherwise, return false. */
     boolean isDone() {
       final CallReturn r = invokeOnce();
+      LOG.debug("#{}: {}", getCallId(), r.getState());
       switch (r.getState()) {
         case RETURNED:
         case EXCEPTION:
@@ -234,6 +248,7 @@ public class AsyncCallHandler {
         case RETRY:
           invokeOnce();
           break;
+        case WAIT_RETRY:
         case ASYNC_CALL_IN_PROGRESS:
         case ASYNC_INVOKED:
           // nothing to do
@@ -245,12 +260,24 @@ public class AsyncCallHandler {
     }
 
     @Override
+    CallReturn processWaitTimeAndRetryInfo() {
+      final Long waitTime = getWaitTime(Time.monotonicNow());
+      LOG.trace("#{} processRetryInfo: waitTime={}", getCallId(), waitTime);
+      if (waitTime != null && waitTime > 0) {
+        return CallReturn.WAIT_RETRY;
+      }
+      processRetryInfo();
+      return CallReturn.RETRY;
+    }
+
+    @Override
     CallReturn invoke() throws Throwable {
       LOG.debug("{}.invoke {}", getClass().getSimpleName(), this);
       if (lowerLayerAsyncGet != null) {
         // async call was submitted early, check the lower level async call
         final boolean isDone = lowerLayerAsyncGet.isDone();
-        LOG.trace("invoke: lowerLayerAsyncGet.isDone()? {}", isDone);
+        LOG.trace("#{} invoke: lowerLayerAsyncGet.isDone()? {}",
+            getCallId(), isDone);
         if (!isDone) {
           return CallReturn.ASYNC_CALL_IN_PROGRESS;
         }
@@ -262,7 +289,7 @@ public class AsyncCallHandler {
       }
 
       // submit a new async call
-      LOG.trace("invoke: ASYNC_INVOKED");
+      LOG.trace("#{} invoke: ASYNC_INVOKED", getCallId());
       final boolean mode = Client.isAsynchronousMode();
       try {
         Client.setAsynchronousMode(true);
@@ -271,9 +298,9 @@ public class AsyncCallHandler {
         Preconditions.checkState(r == null);
         lowerLayerAsyncGet = getLowerLayerAsyncReturn();
 
-        if (counters.isZeros()) {
+        if (getCounters().isZeros()) {
           // first async attempt, initialize
-          LOG.trace("invoke: initAsyncCall");
+          LOG.trace("#{} invoke: initAsyncCall", getCallId());
           asyncCallHandler.initAsyncCall(this, asyncCallReturn);
         }
         return CallReturn.ASYNC_INVOKED;
@@ -287,9 +314,9 @@ public class AsyncCallHandler {
   private volatile boolean hasSuccessfulCall = false;
 
   AsyncCall newAsyncCall(Method method, Object[] args, boolean isRpc,
-                         int callId, RetryInvocationHandler.Counters counters,
+                         int callId,
                          RetryInvocationHandler<?> retryInvocationHandler) {
-    return new AsyncCall(method, args, isRpc, callId, counters,
+    return new AsyncCall(method, args, isRpc, callId,
         retryInvocationHandler, this);
   }
 
@@ -318,4 +345,9 @@ public class AsyncCallHandler {
     };
     ASYNC_RETURN.set(asyncGet);
   }
+
+  @VisibleForTesting
+  public static long getGracePeriod() {
+    return AsyncCallQueue.Processor.GRACE_PERIOD;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce788c20/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/CallReturn.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/CallReturn.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/CallReturn.java
index 943725c..022b785 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/CallReturn.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/CallReturn.java
@@ -29,6 +29,8 @@ class CallReturn {
     EXCEPTION,
     /** Call should be retried according to the {@link RetryPolicy}. */
     RETRY,
+    /** Call should wait and then retry according to the {@link RetryPolicy}. */
+    WAIT_RETRY,
     /** Call, which is async, is still in progress. */
     ASYNC_CALL_IN_PROGRESS,
     /** Call, which is async, just has been invoked. */
@@ -39,6 +41,7 @@ class CallReturn {
       State.ASYNC_CALL_IN_PROGRESS);
   static final CallReturn ASYNC_INVOKED = new CallReturn(State.ASYNC_INVOKED);
   static final CallReturn RETRY = new CallReturn(State.RETRY);
+  static final CallReturn WAIT_RETRY = new CallReturn(State.WAIT_RETRY);
 
   private final Object returnValue;
   private final Throwable thrown;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce788c20/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
index 5198c0d..7bd3a15 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
@@ -18,13 +18,14 @@
 package org.apache.hadoop.io.retry;
 
 import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.io.retry.FailoverProxyProvider.ProxyInfo;
 import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
 import org.apache.hadoop.ipc.*;
 import org.apache.hadoop.ipc.Client.ConnectionId;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
@@ -41,33 +42,51 @@ import java.util.Map;
  */
 @InterfaceAudience.Private
 public class RetryInvocationHandler<T> implements RpcInvocationHandler {
-  public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class);
+  public static final Logger LOG = LoggerFactory.getLogger(
+      RetryInvocationHandler.class);
 
   static class Call {
     private final Method method;
     private final Object[] args;
     private final boolean isRpc;
     private final int callId;
-    final Counters counters;
+    private final Counters counters = new Counters();
 
     private final RetryPolicy retryPolicy;
     private final RetryInvocationHandler<?> retryInvocationHandler;
 
+    private RetryInfo retryInfo;
+
     Call(Method method, Object[] args, boolean isRpc, int callId,
-         Counters counters, RetryInvocationHandler<?> retryInvocationHandler) {
+         RetryInvocationHandler<?> retryInvocationHandler) {
       this.method = method;
       this.args = args;
       this.isRpc = isRpc;
       this.callId = callId;
-      this.counters = counters;
 
       this.retryPolicy = retryInvocationHandler.getRetryPolicy(method);
       this.retryInvocationHandler = retryInvocationHandler;
     }
 
+    int getCallId() {
+      return callId;
+    }
+
+    Counters getCounters() {
+      return counters;
+    }
+
+    synchronized Long getWaitTime(final long now) {
+      return retryInfo == null? null: retryInfo.retryTime - now;
+    }
+
     /** Invoke the call once without retrying. */
     synchronized CallReturn invokeOnce() {
       try {
+        if (retryInfo != null) {
+          return processWaitTimeAndRetryInfo();
+        }
+
         // The number of times this invocation handler has ever been failed over
         // before this method invocation attempt. Used to prevent concurrent
         // failed method invocations from triggering multiple failover attempts.
@@ -76,28 +95,70 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
           return invoke();
         } catch (Exception e) {
           if (LOG.isTraceEnabled()) {
-            LOG.trace(this, e);
+            LOG.trace(toString(), e);
           }
           if (Thread.currentThread().isInterrupted()) {
             // If interrupted, do not retry.
             throw e;
           }
-          retryInvocationHandler.handleException(
-              method, retryPolicy, failoverCount, counters, e);
-          return CallReturn.RETRY;
+
+          retryInfo = retryInvocationHandler.handleException(
+              method, callId, retryPolicy, counters, failoverCount, e);
+          return processWaitTimeAndRetryInfo();
         }
       } catch(Throwable t) {
         return new CallReturn(t);
       }
     }
 
+    /**
+     * It first processes the wait time, if there is any,
+     * and then invokes {@link #processRetryInfo()}.
+     *
+     * If the wait time is positive, it either sleeps for synchronous calls
+     * or immediately returns for asynchronous calls.
+     *
+     * @return {@link CallReturn#RETRY} if the retryInfo is processed;
+     *         otherwise, return {@link CallReturn#WAIT_RETRY}.
+     */
+    CallReturn processWaitTimeAndRetryInfo() throws InterruptedIOException {
+      final Long waitTime = getWaitTime(Time.monotonicNow());
+      LOG.trace("#{} processRetryInfo: retryInfo={}, waitTime={}",
+          callId, retryInfo, waitTime);
+      if (waitTime != null && waitTime > 0) {
+        try {
+          Thread.sleep(retryInfo.delay);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          LOG.warn("Interrupted while waiting to retry", e);
+          InterruptedIOException intIOE = new InterruptedIOException(
+              "Retry interrupted");
+          intIOE.initCause(e);
+          throw intIOE;
+        }
+      }
+      processRetryInfo();
+      return CallReturn.RETRY;
+    }
+
+    synchronized void processRetryInfo() {
+      counters.retries++;
+      if (retryInfo.isFailover()) {
+        retryInvocationHandler.proxyDescriptor.failover(
+            retryInfo.expectedFailoverCount, method, callId);
+        counters.failovers++;
+      }
+      retryInfo = null;
+    }
+
     CallReturn invoke() throws Throwable {
       return new CallReturn(invokeMethod());
     }
 
     Object invokeMethod() throws Throwable {
       if (isRpc) {
-        Client.setCallIdAndRetryCount(callId, counters.retries);
+        Client.setCallIdAndRetryCount(callId, counters.retries,
+            retryInvocationHandler.asyncCallHandler);
       }
       return retryInvocationHandler.invokeMethod(method, args);
     }
@@ -146,15 +207,16 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
       return failoverCount;
     }
 
-    synchronized void failover(long expectedFailoverCount, Method method) {
+    synchronized void failover(long expectedFailoverCount, Method method,
+                               int callId) {
       // Make sure that concurrent failed invocations only cause a single
       // actual failover.
       if (failoverCount == expectedFailoverCount) {
         fpp.performFailover(proxyInfo.proxy);
         failoverCount++;
       } else {
-        LOG.warn("A failover has occurred since the start of "
-            + proxyInfo.getString(method.getName()));
+        LOG.warn("A failover has occurred since the start of call #" + callId
+            + " " + proxyInfo.getString(method.getName()));
       }
       proxyInfo = fpp.getProxy();
     }
@@ -172,22 +234,33 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
   }
 
   private static class RetryInfo {
+    private final long retryTime;
     private final long delay;
-    private final RetryAction failover;
-    private final RetryAction fail;
+    private final RetryAction action;
+    private final long expectedFailoverCount;
 
-    RetryInfo(long delay, RetryAction failover, RetryAction fail) {
+    RetryInfo(long delay, RetryAction action, long expectedFailoverCount) {
       this.delay = delay;
-      this.failover = failover;
-      this.fail = fail;
+      this.retryTime = Time.monotonicNow() + delay;
+      this.action = action;
+      this.expectedFailoverCount = expectedFailoverCount;
+    }
+
+    boolean isFailover() {
+      return action != null
+          && action.action ==  RetryAction.RetryDecision.FAILOVER_AND_RETRY;
+    }
+
+    boolean isFail() {
+      return action != null
+          && action.action ==  RetryAction.RetryDecision.FAIL;
     }
 
     static RetryInfo newRetryInfo(RetryPolicy policy, Exception e,
-        Counters counters, boolean idempotentOrAtMostOnce) throws Exception {
+        Counters counters, boolean idempotentOrAtMostOnce,
+        long expectedFailoverCount) throws Exception {
+      RetryAction max = null;
       long maxRetryDelay = 0;
-      RetryAction failover = null;
-      RetryAction retry = null;
-      RetryAction fail = null;
 
       final Iterable<Exception> exceptions = e instanceof MultiException ?
           ((MultiException) e).getExceptions().values()
@@ -195,23 +268,19 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
       for (Exception exception : exceptions) {
         final RetryAction a = policy.shouldRetry(exception,
             counters.retries, counters.failovers, idempotentOrAtMostOnce);
-        if (a.action == RetryAction.RetryDecision.FAIL) {
-          fail = a;
-        } else {
+        if (a.action != RetryAction.RetryDecision.FAIL) {
           // must be a retry or failover
-          if (a.action == RetryAction.RetryDecision.FAILOVER_AND_RETRY) {
-            failover = a;
-          } else {
-            retry = a;
-          }
           if (a.delayMillis > maxRetryDelay) {
             maxRetryDelay = a.delayMillis;
           }
         }
+
+        if (max == null || max.action.compareTo(a.action) < 0) {
+          max = a;
+        }
       }
 
-      return new RetryInfo(maxRetryDelay, failover,
-          failover == null && retry == null? fail: null);
+      return new RetryInfo(maxRetryDelay, max, expectedFailoverCount);
     }
   }
 
@@ -246,13 +315,12 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
     return proxyDescriptor.getFailoverCount();
   }
 
-  private Call newCall(Method method, Object[] args, boolean isRpc, int callId,
-                       Counters counters) {
+  private Call newCall(Method method, Object[] args, boolean isRpc,
+                       int callId) {
     if (Client.isAsynchronousMode()) {
-      return asyncCallHandler.newAsyncCall(method, args, isRpc, callId,
-          counters, this);
+      return asyncCallHandler.newAsyncCall(method, args, isRpc, callId, this);
     } else {
-      return new Call(method, args, isRpc, callId, counters, this);
+      return new Call(method, args, isRpc, callId, this);
     }
   }
 
@@ -261,9 +329,8 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
       throws Throwable {
     final boolean isRpc = isRpcInvocation(proxyDescriptor.getProxy());
     final int callId = isRpc? Client.nextCallId(): RpcConstants.INVALID_CALL_ID;
-    final Counters counters = new Counters();
 
-    final Call call = newCall(method, args, isRpc, callId, counters);
+    final Call call = newCall(method, args, isRpc, callId);
     while (true) {
       final CallReturn c = call.invokeOnce();
       final CallReturn.State state = c.getState();
@@ -275,45 +342,24 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
     }
   }
 
-  private void handleException(final Method method, final RetryPolicy policy,
-      final long expectedFailoverCount, final Counters counters,
-      final Exception ex) throws Exception {
-    final RetryInfo retryInfo = RetryInfo.newRetryInfo(policy, ex, counters,
-        proxyDescriptor.idempotentOrAtMostOnce(method));
-    counters.retries++;
-
-    if (retryInfo.fail != null) {
+  private RetryInfo handleException(final Method method, final int callId,
+      final RetryPolicy policy, final Counters counters,
+      final long expectFailoverCount, final Exception e) throws Exception {
+    final RetryInfo retryInfo = RetryInfo.newRetryInfo(policy, e,
+        counters, proxyDescriptor.idempotentOrAtMostOnce(method),
+        expectFailoverCount);
+    if (retryInfo.isFail()) {
       // fail.
-      if (retryInfo.fail.reason != null) {
-        LOG.warn("Exception while invoking "
+      if (retryInfo.action.reason != null) {
+        LOG.warn("Exception while invoking call #" + callId + " "
             + proxyDescriptor.getProxyInfo().getString(method.getName())
-            + ". Not retrying because " + retryInfo.fail.reason, ex);
-      }
-      throw ex;
-    }
-
-    // retry
-    final boolean isFailover = retryInfo.failover != null;
-
-    log(method, isFailover, counters.failovers, retryInfo.delay, ex);
-
-    if (retryInfo.delay > 0) {
-      try {
-        Thread.sleep(retryInfo.delay);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        LOG.warn("Interrupted while waiting to retry", e);
-        InterruptedIOException intIOE = new InterruptedIOException(
-            "Retry interrupted");
-        intIOE.initCause(e);
-        throw intIOE;
+            + ". Not retrying because " + retryInfo.action.reason, e);
       }
+      throw e;
     }
 
-    if (isFailover) {
-      proxyDescriptor.failover(expectedFailoverCount, method);
-      counters.failovers++;
-    }
+    log(method, retryInfo.isFailover(), counters.failovers, retryInfo.delay, e);
+    return retryInfo;
   }
 
   private void log(final Method method, final boolean isFailover,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce788c20/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicy.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicy.java
index f3e2bd1..20c0307 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicy.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicy.java
@@ -67,6 +67,7 @@ public interface RetryPolicy {
     }
     
     public enum RetryDecision {
+      // Ordering: FAIL < RETRY < FAILOVER_AND_RETRY.
       FAIL,
       RETRY,
       FAILOVER_AND_RETRY

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce788c20/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index 2820c93..da31ee4 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -93,6 +93,8 @@ public class Client implements AutoCloseable {
 
   private static final ThreadLocal<Integer> callId = new ThreadLocal<Integer>();
   private static final ThreadLocal<Integer> retryCount = new ThreadLocal<Integer>();
+  private static final ThreadLocal<Object> EXTERNAL_CALL_HANDLER
+      = new ThreadLocal<>();
   private static final ThreadLocal<AsyncGet<? extends Writable, IOException>>
       ASYNC_RPC_RESPONSE = new ThreadLocal<>();
   private static final ThreadLocal<Boolean> asynchronousMode =
@@ -111,13 +113,15 @@ public class Client implements AutoCloseable {
   }
 
   /** Set call id and retry count for the next call. */
-  public static void setCallIdAndRetryCount(int cid, int rc) {
+  public static void setCallIdAndRetryCount(int cid, int rc,
+                                            Object externalHandler) {
     Preconditions.checkArgument(cid != RpcConstants.INVALID_CALL_ID);
     Preconditions.checkState(callId.get() == null);
     Preconditions.checkArgument(rc != RpcConstants.INVALID_RETRY_COUNT);
 
     callId.set(cid);
     retryCount.set(rc);
+    EXTERNAL_CALL_HANDLER.set(externalHandler);
   }
 
   private ConcurrentMap<ConnectionId, Connection> connections =
@@ -335,6 +339,7 @@ public class Client implements AutoCloseable {
     IOException error;          // exception, null if success
     final RPC.RpcKind rpcKind;      // Rpc EngineKind
     boolean done;               // true when call is done
+    private final Object externalHandler;
 
     private Call(RPC.RpcKind rpcKind, Writable param) {
       this.rpcKind = rpcKind;
@@ -354,6 +359,8 @@ public class Client implements AutoCloseable {
       } else {
         this.retry = rc;
       }
+
+      this.externalHandler = EXTERNAL_CALL_HANDLER.get();
     }
 
     @Override
@@ -366,6 +373,12 @@ public class Client implements AutoCloseable {
     protected synchronized void callComplete() {
       this.done = true;
       notify();                                 // notify caller
+
+      if (externalHandler != null) {
+        synchronized (externalHandler) {
+          externalHandler.notify();
+        }
+      }
     }
 
     /** Set the exception when there is an error.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce788c20/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestDefaultRetryPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestDefaultRetryPolicy.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestDefaultRetryPolicy.java
index 8a61c04..56dec3a 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestDefaultRetryPolicy.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestDefaultRetryPolicy.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.io.retry;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.RetriableException;
+import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.Timeout;
@@ -35,7 +36,18 @@ import static org.junit.Assert.assertThat;
  */
 public class TestDefaultRetryPolicy {
   @Rule
-  public Timeout timeout = new Timeout(300000);
+  public Timeout timeout = new Timeout(30000);
+
+  /** Verify FAIL < RETRY < FAILOVER_AND_RETRY. */
+  @Test
+  public void testRetryDecisionOrdering() throws Exception {
+    Assert.assertTrue(RetryPolicy.RetryAction.RetryDecision.FAIL.compareTo(
+        RetryPolicy.RetryAction.RetryDecision.RETRY) < 0);
+    Assert.assertTrue(RetryPolicy.RetryAction.RetryDecision.RETRY.compareTo(
+        RetryPolicy.RetryAction.RetryDecision.FAILOVER_AND_RETRY) < 0);
+    Assert.assertTrue(RetryPolicy.RetryAction.RetryDecision.FAIL.compareTo(
+        RetryPolicy.RetryAction.RetryDecision.FAILOVER_AND_RETRY) < 0);
+  }
 
   /**
    * Verify that the default retry policy correctly retries

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce788c20/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
index ef27e12..8fc852a 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
@@ -370,7 +370,7 @@ public class TestAsyncIPC {
       Call createCall(RpcKind rpcKind, Writable rpcRequest) {
         // Set different call id and retry count for the next call
         Client.setCallIdAndRetryCount(Client.nextCallId(),
-            TestIPC.RANDOM.nextInt(255));
+            TestIPC.RANDOM.nextInt(255), null);
 
         final Call call = super.createCall(rpcKind, rpcRequest);
 
@@ -424,7 +424,7 @@ public class TestAsyncIPC {
     final int retryCount = 255;
     // Override client to store the call id
     final Client client = new Client(LongWritable.class, conf);
-    Client.setCallIdAndRetryCount(Client.nextCallId(), retryCount);
+    Client.setCallIdAndRetryCount(Client.nextCallId(), retryCount, null);
 
     // Attach a listener that tracks every call ID received by the server.
     final TestServer server = new TestIPC.TestServer(1, false, conf);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce788c20/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
index 2e93ceb..d462781 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
@@ -1166,7 +1166,7 @@ public class TestIPC {
       retryProxy.dummyRun();
       Assert.assertEquals(TestInvocationHandler.retry, totalRetry + 1);
     } finally {
-      Client.setCallIdAndRetryCount(0, 0);
+      Client.setCallIdAndRetryCount(0, 0, null);
       client.stop();
       server.stop();
     }
@@ -1199,7 +1199,7 @@ public class TestIPC {
     } finally {
       // Check if dummyRun called only once
       Assert.assertEquals(handler.invocations, 1);
-      Client.setCallIdAndRetryCount(0, 0);
+      Client.setCallIdAndRetryCount(0, 0, null);
       client.stop();
       server.stop();
     }
@@ -1244,7 +1244,7 @@ public class TestIPC {
     final int retryCount = 255;
     // Override client to store the call id
     final Client client = new Client(LongWritable.class, conf);
-    Client.setCallIdAndRetryCount(Client.nextCallId(), 255);
+    Client.setCallIdAndRetryCount(Client.nextCallId(), 255, null);
 
     // Attach a listener that tracks every call ID received by the server.
     final TestServer server = new TestServer(1, false);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org