You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2016/06/25 07:46:06 UTC
hadoop git commit: HADOOP-13227. AsyncCallHandler should use an event
driven architecture to handle async calls.
Repository: hadoop
Updated Branches:
refs/heads/trunk bf74dbf80 -> d328e6670
HADOOP-13227. AsyncCallHandler should use an event driven architecture to handle async calls.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d328e667
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d328e667
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d328e667
Branch: refs/heads/trunk
Commit: d328e667067743f723e332d92154da8e84e65742
Parents: bf74dbf
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Sat Jun 25 15:45:16 2016 +0800
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Sat Jun 25 15:45:16 2016 +0800
----------------------------------------------------------------------
.../hadoop/io/retry/AsyncCallHandler.java | 104 ++++++----
.../org/apache/hadoop/io/retry/CallReturn.java | 3 +
.../hadoop/io/retry/RetryInvocationHandler.java | 194 ++++++++++++-------
.../org/apache/hadoop/io/retry/RetryPolicy.java | 1 +
.../main/java/org/apache/hadoop/ipc/Client.java | 15 +-
.../hadoop/io/retry/TestDefaultRetryPolicy.java | 14 +-
.../org/apache/hadoop/ipc/TestAsyncIPC.java | 4 +-
.../java/org/apache/hadoop/ipc/TestIPC.java | 6 +-
8 files changed, 224 insertions(+), 117 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d328e667/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/AsyncCallHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/AsyncCallHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/AsyncCallHandler.java
index 5a03b03..69e1233 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/AsyncCallHandler.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/AsyncCallHandler.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.io.retry;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@@ -27,17 +28,21 @@ import org.apache.hadoop.util.concurrent.AsyncGet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.InterruptedIOException;
import java.lang.reflect.Method;
-import java.util.LinkedList;
+import java.util.Iterator;
import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
/** Handle async calls. */
@InterfaceAudience.Private
public class AsyncCallHandler {
- static final Logger LOG = LoggerFactory.getLogger(AsyncCallHandler.class);
+ public static final Logger LOG = LoggerFactory.getLogger(
+ AsyncCallHandler.class);
private static final ThreadLocal<AsyncGet<?, Exception>>
LOWER_LAYER_ASYNC_RETURN = new ThreadLocal<>();
@@ -73,35 +78,34 @@ public class AsyncCallHandler {
/** A simple concurrent queue which keeping track the empty start time. */
static class ConcurrentQueue<T> {
- private final Queue<T> queue = new LinkedList<>();
- private long emptyStartTime = Time.monotonicNow();
+ private final Queue<T> queue = new ConcurrentLinkedQueue<>();
+ private final AtomicLong emptyStartTime
+ = new AtomicLong(Time.monotonicNow());
- synchronized int size() {
- return queue.size();
+ Iterator<T> iterator() {
+ return queue.iterator();
}
/** Is the queue empty for more than the given time in millisecond? */
- synchronized boolean isEmpty(long time) {
- return queue.isEmpty() && Time.monotonicNow() - emptyStartTime > time;
+ boolean isEmpty(long time) {
+ return Time.monotonicNow() - emptyStartTime.get() > time
+ && queue.isEmpty();
}
- synchronized void offer(T c) {
+ void offer(T c) {
final boolean added = queue.offer(c);
Preconditions.checkState(added);
}
- synchronized T poll() {
- Preconditions.checkState(!queue.isEmpty());
- final T t = queue.poll();
+ void checkEmpty() {
if (queue.isEmpty()) {
- emptyStartTime = Time.monotonicNow();
+ emptyStartTime.set(Time.monotonicNow());
}
- return t;
}
}
/** A queue for handling async calls. */
- static class AsyncCallQueue {
+ class AsyncCallQueue {
private final ConcurrentQueue<AsyncCall> queue = new ConcurrentQueue<>();
private final Processor processor = new Processor();
@@ -113,20 +117,29 @@ public class AsyncCallHandler {
processor.tryStart();
}
- void checkCalls() {
- final int size = queue.size();
- for (int i = 0; i < size; i++) {
- final AsyncCall c = queue.poll();
- if (!c.isDone()) {
- queue.offer(c); // the call is not done yet, add it back.
+ long checkCalls() {
+ final long startTime = Time.monotonicNow();
+ long minWaitTime = Processor.MAX_WAIT_PERIOD;
+
+ for (final Iterator<AsyncCall> i = queue.iterator(); i.hasNext();) {
+ final AsyncCall c = i.next();
+ if (c.isDone()) {
+ i.remove(); // the call is done, remove it from the queue.
+ queue.checkEmpty();
+ } else {
+ final Long waitTime = c.getWaitTime(startTime);
+ if (waitTime != null && waitTime > 0 && waitTime < minWaitTime) {
+ minWaitTime = waitTime;
+ }
}
}
+ return minWaitTime;
}
/** Process the async calls in the queue. */
private class Processor {
- static final long GRACE_PERIOD = 10*1000L;
- static final long SLEEP_PERIOD = 100L;
+ static final long GRACE_PERIOD = 3*1000L;
+ static final long MAX_WAIT_PERIOD = 100L;
private final AtomicReference<Thread> running = new AtomicReference<>();
@@ -141,15 +154,16 @@ public class AsyncCallHandler {
@Override
public void run() {
for (; isRunning(this);) {
+ final long waitTime = checkCalls();
+ tryStop(this);
+
try {
- Thread.sleep(SLEEP_PERIOD);
+ synchronized (AsyncCallHandler.this) {
+ AsyncCallHandler.this.wait(waitTime);
+ }
} catch (InterruptedException e) {
kill(this);
- return;
}
-
- checkCalls();
- tryStop(this);
}
}
};
@@ -215,10 +229,9 @@ public class AsyncCallHandler {
private AsyncGet<?, Exception> lowerLayerAsyncGet;
AsyncCall(Method method, Object[] args, boolean isRpc, int callId,
- RetryInvocationHandler.Counters counters,
RetryInvocationHandler<?> retryInvocationHandler,
AsyncCallHandler asyncCallHandler) {
- super(method, args, isRpc, callId, counters, retryInvocationHandler);
+ super(method, args, isRpc, callId, retryInvocationHandler);
this.asyncCallHandler = asyncCallHandler;
}
@@ -226,6 +239,7 @@ public class AsyncCallHandler {
/** @return true if the call is done; otherwise, return false. */
boolean isDone() {
final CallReturn r = invokeOnce();
+ LOG.debug("#{}: {}", getCallId(), r.getState());
switch (r.getState()) {
case RETURNED:
case EXCEPTION:
@@ -234,6 +248,7 @@ public class AsyncCallHandler {
case RETRY:
invokeOnce();
break;
+ case WAIT_RETRY:
case ASYNC_CALL_IN_PROGRESS:
case ASYNC_INVOKED:
// nothing to do
@@ -245,12 +260,24 @@ public class AsyncCallHandler {
}
@Override
+ CallReturn processWaitTimeAndRetryInfo() {
+ final Long waitTime = getWaitTime(Time.monotonicNow());
+ LOG.trace("#{} processRetryInfo: waitTime={}", getCallId(), waitTime);
+ if (waitTime != null && waitTime > 0) {
+ return CallReturn.WAIT_RETRY;
+ }
+ processRetryInfo();
+ return CallReturn.RETRY;
+ }
+
+ @Override
CallReturn invoke() throws Throwable {
LOG.debug("{}.invoke {}", getClass().getSimpleName(), this);
if (lowerLayerAsyncGet != null) {
// async call was submitted early, check the lower level async call
final boolean isDone = lowerLayerAsyncGet.isDone();
- LOG.trace("invoke: lowerLayerAsyncGet.isDone()? {}", isDone);
+ LOG.trace("#{} invoke: lowerLayerAsyncGet.isDone()? {}",
+ getCallId(), isDone);
if (!isDone) {
return CallReturn.ASYNC_CALL_IN_PROGRESS;
}
@@ -262,7 +289,7 @@ public class AsyncCallHandler {
}
// submit a new async call
- LOG.trace("invoke: ASYNC_INVOKED");
+ LOG.trace("#{} invoke: ASYNC_INVOKED", getCallId());
final boolean mode = Client.isAsynchronousMode();
try {
Client.setAsynchronousMode(true);
@@ -271,9 +298,9 @@ public class AsyncCallHandler {
Preconditions.checkState(r == null);
lowerLayerAsyncGet = getLowerLayerAsyncReturn();
- if (counters.isZeros()) {
+ if (getCounters().isZeros()) {
// first async attempt, initialize
- LOG.trace("invoke: initAsyncCall");
+ LOG.trace("#{} invoke: initAsyncCall", getCallId());
asyncCallHandler.initAsyncCall(this, asyncCallReturn);
}
return CallReturn.ASYNC_INVOKED;
@@ -287,9 +314,9 @@ public class AsyncCallHandler {
private volatile boolean hasSuccessfulCall = false;
AsyncCall newAsyncCall(Method method, Object[] args, boolean isRpc,
- int callId, RetryInvocationHandler.Counters counters,
+ int callId,
RetryInvocationHandler<?> retryInvocationHandler) {
- return new AsyncCall(method, args, isRpc, callId, counters,
+ return new AsyncCall(method, args, isRpc, callId,
retryInvocationHandler, this);
}
@@ -318,4 +345,9 @@ public class AsyncCallHandler {
};
ASYNC_RETURN.set(asyncGet);
}
+
+ @VisibleForTesting
+ public static long getGracePeriod() {
+ return AsyncCallQueue.Processor.GRACE_PERIOD;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d328e667/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/CallReturn.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/CallReturn.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/CallReturn.java
index 943725c..022b785 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/CallReturn.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/CallReturn.java
@@ -29,6 +29,8 @@ class CallReturn {
EXCEPTION,
/** Call should be retried according to the {@link RetryPolicy}. */
RETRY,
+ /** Call should wait and then retry according to the {@link RetryPolicy}. */
+ WAIT_RETRY,
/** Call, which is async, is still in progress. */
ASYNC_CALL_IN_PROGRESS,
/** Call, which is async, just has been invoked. */
@@ -39,6 +41,7 @@ class CallReturn {
State.ASYNC_CALL_IN_PROGRESS);
static final CallReturn ASYNC_INVOKED = new CallReturn(State.ASYNC_INVOKED);
static final CallReturn RETRY = new CallReturn(State.RETRY);
+ static final CallReturn WAIT_RETRY = new CallReturn(State.WAIT_RETRY);
private final Object returnValue;
private final Throwable thrown;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d328e667/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
index 5198c0d..7bd3a15 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
@@ -18,13 +18,14 @@
package org.apache.hadoop.io.retry;
import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.retry.FailoverProxyProvider.ProxyInfo;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
import org.apache.hadoop.ipc.*;
import org.apache.hadoop.ipc.Client.ConnectionId;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InterruptedIOException;
@@ -41,33 +42,51 @@ import java.util.Map;
*/
@InterfaceAudience.Private
public class RetryInvocationHandler<T> implements RpcInvocationHandler {
- public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class);
+ public static final Logger LOG = LoggerFactory.getLogger(
+ RetryInvocationHandler.class);
static class Call {
private final Method method;
private final Object[] args;
private final boolean isRpc;
private final int callId;
- final Counters counters;
+ private final Counters counters = new Counters();
private final RetryPolicy retryPolicy;
private final RetryInvocationHandler<?> retryInvocationHandler;
+ private RetryInfo retryInfo;
+
Call(Method method, Object[] args, boolean isRpc, int callId,
- Counters counters, RetryInvocationHandler<?> retryInvocationHandler) {
+ RetryInvocationHandler<?> retryInvocationHandler) {
this.method = method;
this.args = args;
this.isRpc = isRpc;
this.callId = callId;
- this.counters = counters;
this.retryPolicy = retryInvocationHandler.getRetryPolicy(method);
this.retryInvocationHandler = retryInvocationHandler;
}
+ int getCallId() {
+ return callId;
+ }
+
+ Counters getCounters() {
+ return counters;
+ }
+
+ synchronized Long getWaitTime(final long now) {
+ return retryInfo == null? null: retryInfo.retryTime - now;
+ }
+
/** Invoke the call once without retrying. */
synchronized CallReturn invokeOnce() {
try {
+ if (retryInfo != null) {
+ return processWaitTimeAndRetryInfo();
+ }
+
// The number of times this invocation handler has ever been failed over
// before this method invocation attempt. Used to prevent concurrent
// failed method invocations from triggering multiple failover attempts.
@@ -76,28 +95,70 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
return invoke();
} catch (Exception e) {
if (LOG.isTraceEnabled()) {
- LOG.trace(this, e);
+ LOG.trace(toString(), e);
}
if (Thread.currentThread().isInterrupted()) {
// If interrupted, do not retry.
throw e;
}
- retryInvocationHandler.handleException(
- method, retryPolicy, failoverCount, counters, e);
- return CallReturn.RETRY;
+
+ retryInfo = retryInvocationHandler.handleException(
+ method, callId, retryPolicy, counters, failoverCount, e);
+ return processWaitTimeAndRetryInfo();
}
} catch(Throwable t) {
return new CallReturn(t);
}
}
+ /**
+ * It first processes the wait time, if there is any,
+ * and then invokes {@link #processRetryInfo()}.
+ *
+ * If the wait time is positive, it either sleeps for synchronous calls
+ * or immediately returns for asynchronous calls.
+ *
+ * @return {@link CallReturn#RETRY} if the retryInfo is processed;
+ * otherwise, return {@link CallReturn#WAIT_RETRY}.
+ */
+ CallReturn processWaitTimeAndRetryInfo() throws InterruptedIOException {
+ final Long waitTime = getWaitTime(Time.monotonicNow());
+ LOG.trace("#{} processRetryInfo: retryInfo={}, waitTime={}",
+ callId, retryInfo, waitTime);
+ if (waitTime != null && waitTime > 0) {
+ try {
+ Thread.sleep(retryInfo.delay);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.warn("Interrupted while waiting to retry", e);
+ InterruptedIOException intIOE = new InterruptedIOException(
+ "Retry interrupted");
+ intIOE.initCause(e);
+ throw intIOE;
+ }
+ }
+ processRetryInfo();
+ return CallReturn.RETRY;
+ }
+
+ synchronized void processRetryInfo() {
+ counters.retries++;
+ if (retryInfo.isFailover()) {
+ retryInvocationHandler.proxyDescriptor.failover(
+ retryInfo.expectedFailoverCount, method, callId);
+ counters.failovers++;
+ }
+ retryInfo = null;
+ }
+
CallReturn invoke() throws Throwable {
return new CallReturn(invokeMethod());
}
Object invokeMethod() throws Throwable {
if (isRpc) {
- Client.setCallIdAndRetryCount(callId, counters.retries);
+ Client.setCallIdAndRetryCount(callId, counters.retries,
+ retryInvocationHandler.asyncCallHandler);
}
return retryInvocationHandler.invokeMethod(method, args);
}
@@ -146,15 +207,16 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
return failoverCount;
}
- synchronized void failover(long expectedFailoverCount, Method method) {
+ synchronized void failover(long expectedFailoverCount, Method method,
+ int callId) {
// Make sure that concurrent failed invocations only cause a single
// actual failover.
if (failoverCount == expectedFailoverCount) {
fpp.performFailover(proxyInfo.proxy);
failoverCount++;
} else {
- LOG.warn("A failover has occurred since the start of "
- + proxyInfo.getString(method.getName()));
+ LOG.warn("A failover has occurred since the start of call #" + callId
+ + " " + proxyInfo.getString(method.getName()));
}
proxyInfo = fpp.getProxy();
}
@@ -172,22 +234,33 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
}
private static class RetryInfo {
+ private final long retryTime;
private final long delay;
- private final RetryAction failover;
- private final RetryAction fail;
+ private final RetryAction action;
+ private final long expectedFailoverCount;
- RetryInfo(long delay, RetryAction failover, RetryAction fail) {
+ RetryInfo(long delay, RetryAction action, long expectedFailoverCount) {
this.delay = delay;
- this.failover = failover;
- this.fail = fail;
+ this.retryTime = Time.monotonicNow() + delay;
+ this.action = action;
+ this.expectedFailoverCount = expectedFailoverCount;
+ }
+
+ boolean isFailover() {
+ return action != null
+ && action.action == RetryAction.RetryDecision.FAILOVER_AND_RETRY;
+ }
+
+ boolean isFail() {
+ return action != null
+ && action.action == RetryAction.RetryDecision.FAIL;
}
static RetryInfo newRetryInfo(RetryPolicy policy, Exception e,
- Counters counters, boolean idempotentOrAtMostOnce) throws Exception {
+ Counters counters, boolean idempotentOrAtMostOnce,
+ long expectedFailoverCount) throws Exception {
+ RetryAction max = null;
long maxRetryDelay = 0;
- RetryAction failover = null;
- RetryAction retry = null;
- RetryAction fail = null;
final Iterable<Exception> exceptions = e instanceof MultiException ?
((MultiException) e).getExceptions().values()
@@ -195,23 +268,19 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
for (Exception exception : exceptions) {
final RetryAction a = policy.shouldRetry(exception,
counters.retries, counters.failovers, idempotentOrAtMostOnce);
- if (a.action == RetryAction.RetryDecision.FAIL) {
- fail = a;
- } else {
+ if (a.action != RetryAction.RetryDecision.FAIL) {
// must be a retry or failover
- if (a.action == RetryAction.RetryDecision.FAILOVER_AND_RETRY) {
- failover = a;
- } else {
- retry = a;
- }
if (a.delayMillis > maxRetryDelay) {
maxRetryDelay = a.delayMillis;
}
}
+
+ if (max == null || max.action.compareTo(a.action) < 0) {
+ max = a;
+ }
}
- return new RetryInfo(maxRetryDelay, failover,
- failover == null && retry == null? fail: null);
+ return new RetryInfo(maxRetryDelay, max, expectedFailoverCount);
}
}
@@ -246,13 +315,12 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
return proxyDescriptor.getFailoverCount();
}
- private Call newCall(Method method, Object[] args, boolean isRpc, int callId,
- Counters counters) {
+ private Call newCall(Method method, Object[] args, boolean isRpc,
+ int callId) {
if (Client.isAsynchronousMode()) {
- return asyncCallHandler.newAsyncCall(method, args, isRpc, callId,
- counters, this);
+ return asyncCallHandler.newAsyncCall(method, args, isRpc, callId, this);
} else {
- return new Call(method, args, isRpc, callId, counters, this);
+ return new Call(method, args, isRpc, callId, this);
}
}
@@ -261,9 +329,8 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
throws Throwable {
final boolean isRpc = isRpcInvocation(proxyDescriptor.getProxy());
final int callId = isRpc? Client.nextCallId(): RpcConstants.INVALID_CALL_ID;
- final Counters counters = new Counters();
- final Call call = newCall(method, args, isRpc, callId, counters);
+ final Call call = newCall(method, args, isRpc, callId);
while (true) {
final CallReturn c = call.invokeOnce();
final CallReturn.State state = c.getState();
@@ -275,45 +342,24 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
}
}
- private void handleException(final Method method, final RetryPolicy policy,
- final long expectedFailoverCount, final Counters counters,
- final Exception ex) throws Exception {
- final RetryInfo retryInfo = RetryInfo.newRetryInfo(policy, ex, counters,
- proxyDescriptor.idempotentOrAtMostOnce(method));
- counters.retries++;
-
- if (retryInfo.fail != null) {
+ private RetryInfo handleException(final Method method, final int callId,
+ final RetryPolicy policy, final Counters counters,
+ final long expectFailoverCount, final Exception e) throws Exception {
+ final RetryInfo retryInfo = RetryInfo.newRetryInfo(policy, e,
+ counters, proxyDescriptor.idempotentOrAtMostOnce(method),
+ expectFailoverCount);
+ if (retryInfo.isFail()) {
// fail.
- if (retryInfo.fail.reason != null) {
- LOG.warn("Exception while invoking "
+ if (retryInfo.action.reason != null) {
+ LOG.warn("Exception while invoking call #" + callId + " "
+ proxyDescriptor.getProxyInfo().getString(method.getName())
- + ". Not retrying because " + retryInfo.fail.reason, ex);
- }
- throw ex;
- }
-
- // retry
- final boolean isFailover = retryInfo.failover != null;
-
- log(method, isFailover, counters.failovers, retryInfo.delay, ex);
-
- if (retryInfo.delay > 0) {
- try {
- Thread.sleep(retryInfo.delay);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOG.warn("Interrupted while waiting to retry", e);
- InterruptedIOException intIOE = new InterruptedIOException(
- "Retry interrupted");
- intIOE.initCause(e);
- throw intIOE;
+ + ". Not retrying because " + retryInfo.action.reason, e);
}
+ throw e;
}
- if (isFailover) {
- proxyDescriptor.failover(expectedFailoverCount, method);
- counters.failovers++;
- }
+ log(method, retryInfo.isFailover(), counters.failovers, retryInfo.delay, e);
+ return retryInfo;
}
private void log(final Method method, final boolean isFailover,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d328e667/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicy.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicy.java
index f3e2bd1..20c0307 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicy.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicy.java
@@ -67,6 +67,7 @@ public interface RetryPolicy {
}
public enum RetryDecision {
+ // Ordering: FAIL < RETRY < FAILOVER_AND_RETRY.
FAIL,
RETRY,
FAILOVER_AND_RETRY
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d328e667/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index ed8d905..183bad4 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -93,6 +93,8 @@ public class Client implements AutoCloseable {
private static final ThreadLocal<Integer> callId = new ThreadLocal<Integer>();
private static final ThreadLocal<Integer> retryCount = new ThreadLocal<Integer>();
+ private static final ThreadLocal<Object> EXTERNAL_CALL_HANDLER
+ = new ThreadLocal<>();
private static final ThreadLocal<AsyncGet<? extends Writable, IOException>>
ASYNC_RPC_RESPONSE = new ThreadLocal<>();
private static final ThreadLocal<Boolean> asynchronousMode =
@@ -111,13 +113,15 @@ public class Client implements AutoCloseable {
}
/** Set call id and retry count for the next call. */
- public static void setCallIdAndRetryCount(int cid, int rc) {
+ public static void setCallIdAndRetryCount(int cid, int rc,
+ Object externalHandler) {
Preconditions.checkArgument(cid != RpcConstants.INVALID_CALL_ID);
Preconditions.checkState(callId.get() == null);
Preconditions.checkArgument(rc != RpcConstants.INVALID_RETRY_COUNT);
callId.set(cid);
retryCount.set(rc);
+ EXTERNAL_CALL_HANDLER.set(externalHandler);
}
private ConcurrentMap<ConnectionId, Connection> connections =
@@ -333,6 +337,7 @@ public class Client implements AutoCloseable {
IOException error; // exception, null if success
final RPC.RpcKind rpcKind; // Rpc EngineKind
boolean done; // true when call is done
+ private final Object externalHandler;
private Call(RPC.RpcKind rpcKind, Writable param) {
this.rpcKind = rpcKind;
@@ -352,6 +357,8 @@ public class Client implements AutoCloseable {
} else {
this.retry = rc;
}
+
+ this.externalHandler = EXTERNAL_CALL_HANDLER.get();
}
@Override
@@ -364,6 +371,12 @@ public class Client implements AutoCloseable {
protected synchronized void callComplete() {
this.done = true;
notify(); // notify caller
+
+ if (externalHandler != null) {
+ synchronized (externalHandler) {
+ externalHandler.notify();
+ }
+ }
}
/** Set the exception when there is an error.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d328e667/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestDefaultRetryPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestDefaultRetryPolicy.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestDefaultRetryPolicy.java
index 8a61c04..56dec3a 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestDefaultRetryPolicy.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestDefaultRetryPolicy.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.io.retry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RetriableException;
+import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
@@ -35,7 +36,18 @@ import static org.junit.Assert.assertThat;
*/
public class TestDefaultRetryPolicy {
@Rule
- public Timeout timeout = new Timeout(300000);
+ public Timeout timeout = new Timeout(30000);
+
+ /** Verify FAIL < RETRY < FAILOVER_AND_RETRY. */
+ @Test
+ public void testRetryDecisionOrdering() throws Exception {
+ Assert.assertTrue(RetryPolicy.RetryAction.RetryDecision.FAIL.compareTo(
+ RetryPolicy.RetryAction.RetryDecision.RETRY) < 0);
+ Assert.assertTrue(RetryPolicy.RetryAction.RetryDecision.RETRY.compareTo(
+ RetryPolicy.RetryAction.RetryDecision.FAILOVER_AND_RETRY) < 0);
+ Assert.assertTrue(RetryPolicy.RetryAction.RetryDecision.FAIL.compareTo(
+ RetryPolicy.RetryAction.RetryDecision.FAILOVER_AND_RETRY) < 0);
+ }
/**
* Verify that the default retry policy correctly retries
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d328e667/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
index 4450c0c..3f2802f 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
@@ -367,7 +367,7 @@ public class TestAsyncIPC {
Call createCall(RpcKind rpcKind, Writable rpcRequest) {
// Set different call id and retry count for the next call
Client.setCallIdAndRetryCount(Client.nextCallId(),
- TestIPC.RANDOM.nextInt(255));
+ TestIPC.RANDOM.nextInt(255), null);
final Call call = super.createCall(rpcKind, rpcRequest);
@@ -421,7 +421,7 @@ public class TestAsyncIPC {
final int retryCount = 255;
// Override client to store the call id
final Client client = new Client(LongWritable.class, conf);
- Client.setCallIdAndRetryCount(Client.nextCallId(), retryCount);
+ Client.setCallIdAndRetryCount(Client.nextCallId(), retryCount, null);
// Attach a listener that tracks every call ID received by the server.
final TestServer server = new TestIPC.TestServer(1, false, conf);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d328e667/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
index 6bfcc53..82da62d 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
@@ -1172,7 +1172,7 @@ public class TestIPC {
retryProxy.dummyRun();
Assert.assertEquals(TestInvocationHandler.retry, totalRetry + 1);
} finally {
- Client.setCallIdAndRetryCount(0, 0);
+ Client.setCallIdAndRetryCount(0, 0, null);
client.stop();
server.stop();
}
@@ -1205,7 +1205,7 @@ public class TestIPC {
} finally {
// Check if dummyRun called only once
Assert.assertEquals(handler.invocations, 1);
- Client.setCallIdAndRetryCount(0, 0);
+ Client.setCallIdAndRetryCount(0, 0, null);
client.stop();
server.stop();
}
@@ -1250,7 +1250,7 @@ public class TestIPC {
final int retryCount = 255;
// Override client to store the call id
final Client client = new Client(LongWritable.class, conf);
- Client.setCallIdAndRetryCount(Client.nextCallId(), 255);
+ Client.setCallIdAndRetryCount(Client.nextCallId(), 255, null);
// Attach a listener that tracks every call ID received by the server.
final TestServer server = new TestServer(1, false);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org