You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2016/03/04 02:02:01 UTC
[22/50] [abbrv] hadoop git commit: HADOOP-12622. Improve the loggings
in RetryPolicies and RetryInvocationHandler. Contributed by Junping Du
HADOOP-12622. Improve the loggings in RetryPolicies and RetryInvocationHandler. Contributed by Junping Du
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d8f390d0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d8f390d0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d8f390d0
Branch: refs/heads/HDFS-1312
Commit: d8f390d015510950ccf78174af8891cd613d4438
Parents: 9dafaaa
Author: Jian He <ji...@apache.org>
Authored: Mon Feb 29 16:24:05 2016 -0800
Committer: Jian He <ji...@apache.org>
Committed: Mon Feb 29 16:24:05 2016 -0800
----------------------------------------------------------------------
hadoop-common-project/hadoop-common/CHANGES.txt | 3 +
.../hadoop/io/retry/RetryInvocationHandler.java | 22 ++--
.../apache/hadoop/io/retry/RetryPolicies.java | 60 ++++++++---
.../apache/hadoop/io/retry/TestRetryProxy.java | 101 +++++++++++++++++--
4 files changed, 153 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8f390d0/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index f444b71..8655d24 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -1165,6 +1165,9 @@ Release 2.8.0 - UNRELEASED
HADOOP-12825. Log slow name resolutions.
(Sidharta Seethana via stevel)
+ HADOOP-12622. Improve the loggings in RetryPolicies and RetryInvocationHandler.
+ (Junping Du via jianhe)
+
OPTIMIZATIONS
HADOOP-11785. Reduce the number of listStatus operation in distcp
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8f390d0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
index 6864d5d..a67c84f 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
@@ -120,6 +120,7 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
invocationFailoverCount, isIdempotentOrAtMostOnce);
RetryAction failAction = getFailAction(actions);
if (failAction != null) {
+ // fail.
if (failAction.reason != null) {
LOG.warn("Exception while invoking " + currentProxy.proxy.getClass()
+ "." + method.getName() + " over " + currentProxy.proxyInfo
@@ -135,7 +136,8 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
worthLogging |= LOG.isDebugEnabled();
RetryAction failOverAction = getFailOverAction(actions);
long delay = getDelayMillis(actions);
- if (failOverAction != null && worthLogging) {
+
+ if (worthLogging) {
String msg = "Exception while invoking " + method.getName()
+ " of class " + currentProxy.proxy.getClass().getSimpleName()
+ " over " + currentProxy.proxyInfo;
@@ -143,21 +145,21 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
if (invocationFailoverCount > 0) {
msg += " after " + invocationFailoverCount + " fail over attempts";
}
- msg += ". Trying to fail over " + formatSleepMessage(delay);
- LOG.info(msg, ex);
- } else {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Exception while invoking " + method.getName()
- + " of class " + currentProxy.proxy.getClass().getSimpleName()
- + " over " + currentProxy.proxyInfo + ". Retrying "
- + formatSleepMessage(delay), ex);
+
+ if (failOverAction != null) {
+ // failover
+ msg += ". Trying to fail over " + formatSleepMessage(delay);
+ } else {
+ // retry
+ msg += ". Retrying " + formatSleepMessage(delay);
}
+ LOG.info(msg, ex);
}
if (delay > 0) {
Thread.sleep(delay);
}
-
+
if (failOverAction != null) {
// Make sure that concurrent failed method invocations only cause a
// single actual fail over.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8f390d0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
index 171d52a..131aa8f 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -39,6 +39,8 @@ import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.net.ConnectTimeoutException;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* <p>
* A collection of useful implementations of {@link RetryPolicy}.
@@ -177,10 +179,11 @@ public class RetryPolicies {
@Override
public RetryAction shouldRetry(Exception e, int retries, int failovers,
boolean isIdempotentOrAtMostOnce) throws Exception {
- return RetryAction.FAIL;
+ return new RetryAction(RetryAction.RetryDecision.FAIL, 0, "try once " +
+ "and fail.");
}
}
-
+
static class RetryForever implements RetryPolicy {
@Override
public RetryAction shouldRetry(Exception e, int retries, int failovers,
@@ -221,14 +224,24 @@ public class RetryPolicies {
public RetryAction shouldRetry(Exception e, int retries, int failovers,
boolean isIdempotentOrAtMostOnce) throws Exception {
if (retries >= maxRetries) {
- return RetryAction.FAIL;
+ return new RetryAction(RetryAction.RetryDecision.FAIL, 0 , getReason());
}
return new RetryAction(RetryAction.RetryDecision.RETRY,
- timeUnit.toMillis(calculateSleepTime(retries)));
+ timeUnit.toMillis(calculateSleepTime(retries)), getReason());
}
-
+
+ protected String getReason() {
+ return constructReasonString(maxRetries);
+ }
+
+ @VisibleForTesting
+ public static String constructReasonString(int retries) {
+ return "retries get failed due to exceeded maximum allowed retries " +
+ "number: " + retries;
+ }
+
protected abstract long calculateSleepTime(int retries);
-
+
@Override
public int hashCode() {
return toString().hashCode();
@@ -264,18 +277,37 @@ public class RetryPolicies {
return sleepTime;
}
}
-
- static class RetryUpToMaximumTimeWithFixedSleep extends RetryUpToMaximumCountWithFixedSleep {
- public RetryUpToMaximumTimeWithFixedSleep(long maxTime, long sleepTime, TimeUnit timeUnit) {
+
+ static class RetryUpToMaximumTimeWithFixedSleep extends
+ RetryUpToMaximumCountWithFixedSleep {
+ private long maxTime = 0;
+ private TimeUnit timeUnit;
+
+ public RetryUpToMaximumTimeWithFixedSleep(long maxTime, long sleepTime,
+ TimeUnit timeUnit) {
super((int) (maxTime / sleepTime), sleepTime, timeUnit);
+ this.maxTime = maxTime;
+ this.timeUnit = timeUnit;
+ }
+
+ @Override
+ protected String getReason() {
+ return constructReasonString(this.maxTime, this.timeUnit);
+ }
+
+ @VisibleForTesting
+ public static String constructReasonString(long maxTime,
+ TimeUnit timeUnit) {
+ return "retries get failed due to exceeded maximum allowed time (" +
+ "in " + timeUnit.toString() + "): " + maxTime;
}
}
-
+
static class RetryUpToMaximumCountWithProportionalSleep extends RetryLimited {
public RetryUpToMaximumCountWithProportionalSleep(int maxRetries, long sleepTime, TimeUnit timeUnit) {
super(maxRetries, sleepTime, timeUnit);
}
-
+
@Override
protected long calculateSleepTime(int retries) {
return sleepTime * (retries + 1);
@@ -332,7 +364,8 @@ public class RetryPolicies {
final Pair p = searchPair(curRetry);
if (p == null) {
//no more retries.
- return RetryAction.FAIL;
+ return new RetryAction(RetryAction.RetryDecision.FAIL, 0 , "Retry " +
+ "all pairs in MultipleLinearRandomRetry: " + pairs);
}
//calculate sleep time and return.
@@ -549,6 +582,7 @@ public class RetryPolicies {
protected long calculateSleepTime(int retries) {
return calculateExponentialTime(sleepTime, retries + 1);
}
+
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8f390d0/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestRetryProxy.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestRetryProxy.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestRetryProxy.java
index 35a45b4..4137dae 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestRetryProxy.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestRetryProxy.java
@@ -28,6 +28,15 @@ import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumCountWith
import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumTimeWithFixedSleep;
import static org.apache.hadoop.io.retry.RetryPolicies.retryForeverWithFixedSleep;
import static org.apache.hadoop.io.retry.RetryPolicies.exponentialBackoffRetry;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
import static org.junit.Assert.*;
import java.io.IOException;
@@ -41,10 +50,19 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
+import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision;
+import org.apache.hadoop.io.retry.RetryPolicies.RetryUpToMaximumCountWithFixedSleep;
+import org.apache.hadoop.io.retry.RetryPolicies.RetryUpToMaximumTimeWithFixedSleep;
+import org.apache.hadoop.io.retry.RetryPolicies.TryOnceThenFail;
import org.apache.hadoop.io.retry.UnreliableInterface.FatalException;
import org.apache.hadoop.io.retry.UnreliableInterface.UnreliableException;
import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ipc.RemoteException;
+
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
import org.junit.Before;
import org.junit.Test;
@@ -53,25 +71,57 @@ import java.lang.reflect.UndeclaredThrowableException;
public class TestRetryProxy {
private UnreliableImplementation unreliableImpl;
+ private RetryAction caughtRetryAction = null;
@Before
public void setUp() throws Exception {
unreliableImpl = new UnreliableImplementation();
}
+ // answer mockPolicy's method with realPolicy, caught method's return value
+ private void setupMockPolicy(RetryPolicy mockPolicy,
+ final RetryPolicy realPolicy) throws Exception {
+ when(mockPolicy.shouldRetry(any(Exception.class), anyInt(), anyInt(),
+ anyBoolean())).thenAnswer(new Answer<RetryAction>() {
+ @SuppressWarnings("rawtypes")
+ @Override
+ public RetryAction answer(InvocationOnMock invocation) throws Throwable {
+ Object[] args = invocation.getArguments();
+ Exception e = (Exception) args[0];
+ int retries = (int) args[1];
+ int failovers = (int) args[2];
+ boolean isIdempotentOrAtMostOnce = (boolean) args[3];
+ caughtRetryAction = realPolicy.shouldRetry(e, retries, failovers,
+ isIdempotentOrAtMostOnce);
+ return caughtRetryAction;
+ }
+ });
+ }
+
@Test
- public void testTryOnceThenFail() throws UnreliableException {
+ public void testTryOnceThenFail() throws Exception {
+ RetryPolicy policy = mock(TryOnceThenFail.class);
+ RetryPolicy realPolicy = TRY_ONCE_THEN_FAIL;
+ setupMockPolicy(policy, realPolicy);
+
UnreliableInterface unreliable = (UnreliableInterface)
- RetryProxy.create(UnreliableInterface.class, unreliableImpl, TRY_ONCE_THEN_FAIL);
+ RetryProxy.create(UnreliableInterface.class, unreliableImpl, policy);
unreliable.alwaysSucceeds();
try {
unreliable.failsOnceThenSucceeds();
fail("Should fail");
} catch (UnreliableException e) {
// expected
+ verify(policy, times(1)).shouldRetry(any(Exception.class), anyInt(),
+ anyInt(), anyBoolean());
+ assertEquals(RetryDecision.FAIL, caughtRetryAction.action);
+ assertEquals("try once and fail.", caughtRetryAction.reason);
+ } catch (Exception e) {
+ fail("Other exception other than UnreliableException should also get " +
+ "failed.");
}
}
-
+
/**
* Test for {@link RetryInvocationHandler#isRpcInvocation(Object)}
*/
@@ -125,25 +175,48 @@ public class TestRetryProxy {
}
@Test
- public void testRetryUpToMaximumCountWithFixedSleep() throws UnreliableException {
+ public void testRetryUpToMaximumCountWithFixedSleep() throws
+ Exception {
+
+ RetryPolicy policy = mock(RetryUpToMaximumCountWithFixedSleep.class);
+ int maxRetries = 8;
+ RetryPolicy realPolicy = retryUpToMaximumCountWithFixedSleep(maxRetries, 1,
+ TimeUnit.NANOSECONDS);
+ setupMockPolicy(policy, realPolicy);
+
UnreliableInterface unreliable = (UnreliableInterface)
- RetryProxy.create(UnreliableInterface.class, unreliableImpl,
- retryUpToMaximumCountWithFixedSleep(8, 1, TimeUnit.NANOSECONDS));
+ RetryProxy.create(UnreliableInterface.class, unreliableImpl, policy);
+ // shouldRetry += 1
unreliable.alwaysSucceeds();
+ // shouldRetry += 2
unreliable.failsOnceThenSucceeds();
try {
+ // shouldRetry += (maxRetries -1) (just failed once above)
unreliable.failsTenTimesThenSucceeds();
fail("Should fail");
} catch (UnreliableException e) {
// expected
+ verify(policy, times(maxRetries + 2)).shouldRetry(any(Exception.class),
+ anyInt(), anyInt(), anyBoolean());
+ assertEquals(RetryDecision.FAIL, caughtRetryAction.action);
+ assertEquals(RetryUpToMaximumCountWithFixedSleep.constructReasonString(
+ maxRetries), caughtRetryAction.reason);
+ } catch (Exception e) {
+ fail("Other exception other than UnreliableException should also get " +
+ "failed.");
}
}
@Test
- public void testRetryUpToMaximumTimeWithFixedSleep() throws UnreliableException {
+ public void testRetryUpToMaximumTimeWithFixedSleep() throws Exception {
+ RetryPolicy policy = mock(RetryUpToMaximumTimeWithFixedSleep.class);
+ long maxTime = 80L;
+ RetryPolicy realPolicy = retryUpToMaximumTimeWithFixedSleep(maxTime, 10,
+ TimeUnit.NANOSECONDS);
+ setupMockPolicy(policy, realPolicy);
+
UnreliableInterface unreliable = (UnreliableInterface)
- RetryProxy.create(UnreliableInterface.class, unreliableImpl,
- retryUpToMaximumTimeWithFixedSleep(80, 10, TimeUnit.NANOSECONDS));
+ RetryProxy.create(UnreliableInterface.class, unreliableImpl, policy);
unreliable.alwaysSucceeds();
unreliable.failsOnceThenSucceeds();
try {
@@ -151,9 +224,17 @@ public class TestRetryProxy {
fail("Should fail");
} catch (UnreliableException e) {
// expected
+ verify(policy, times((int)(maxTime/10) + 2)).shouldRetry(any(Exception.class),
+ anyInt(), anyInt(), anyBoolean());
+ assertEquals(RetryDecision.FAIL, caughtRetryAction.action);
+ assertEquals(RetryUpToMaximumTimeWithFixedSleep.constructReasonString(
+ maxTime, TimeUnit.NANOSECONDS), caughtRetryAction.reason);
+ } catch (Exception e) {
+ fail("Other exception other than UnreliableException should also get " +
+ "failed.");
}
}
-
+
@Test
public void testRetryUpToMaximumCountWithProportionalSleep() throws UnreliableException {
UnreliableInterface unreliable = (UnreliableInterface)