You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2016/03/04 02:02:01 UTC

[22/50] [abbrv] hadoop git commit: HADOOP-12622. Improve the loggings in RetryPolicies and RetryInvocationHandler. Contributed by Junping Du

HADOOP-12622. Improve the loggings in RetryPolicies and RetryInvocationHandler. Contributed by Junping Du


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d8f390d0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d8f390d0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d8f390d0

Branch: refs/heads/HDFS-1312
Commit: d8f390d015510950ccf78174af8891cd613d4438
Parents: 9dafaaa
Author: Jian He <ji...@apache.org>
Authored: Mon Feb 29 16:24:05 2016 -0800
Committer: Jian He <ji...@apache.org>
Committed: Mon Feb 29 16:24:05 2016 -0800

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |   3 +
 .../hadoop/io/retry/RetryInvocationHandler.java |  22 ++--
 .../apache/hadoop/io/retry/RetryPolicies.java   |  60 ++++++++---
 .../apache/hadoop/io/retry/TestRetryProxy.java  | 101 +++++++++++++++++--
 4 files changed, 153 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8f390d0/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index f444b71..8655d24 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -1165,6 +1165,9 @@ Release 2.8.0 - UNRELEASED
     HADOOP-12825. Log slow name resolutions.
     (Sidharta Seethana via stevel)
 
+    HADOOP-12622. Improve the loggings in RetryPolicies and RetryInvocationHandler.
+    (Junping Du via jianhe)
+
   OPTIMIZATIONS
 
     HADOOP-11785. Reduce the number of listStatus operation in distcp

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8f390d0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
index 6864d5d..a67c84f 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
@@ -120,6 +120,7 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
                 invocationFailoverCount, isIdempotentOrAtMostOnce);
         RetryAction failAction = getFailAction(actions);
         if (failAction != null) {
+          // fail.
           if (failAction.reason != null) {
             LOG.warn("Exception while invoking " + currentProxy.proxy.getClass()
                 + "." + method.getName() + " over " + currentProxy.proxyInfo
@@ -135,7 +136,8 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
           worthLogging |= LOG.isDebugEnabled();
           RetryAction failOverAction = getFailOverAction(actions);
           long delay = getDelayMillis(actions);
-          if (failOverAction != null && worthLogging) {
+
+          if (worthLogging) {
             String msg = "Exception while invoking " + method.getName()
                 + " of class " + currentProxy.proxy.getClass().getSimpleName()
                 + " over " + currentProxy.proxyInfo;
@@ -143,21 +145,21 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
             if (invocationFailoverCount > 0) {
               msg += " after " + invocationFailoverCount + " fail over attempts"; 
             }
-            msg += ". Trying to fail over " + formatSleepMessage(delay);
-            LOG.info(msg, ex);
-          } else {
-            if(LOG.isDebugEnabled()) {
-              LOG.debug("Exception while invoking " + method.getName()
-                  + " of class " + currentProxy.proxy.getClass().getSimpleName()
-                  + " over " + currentProxy.proxyInfo + ". Retrying "
-                  + formatSleepMessage(delay), ex);
+
+            if (failOverAction != null) {
+              // failover
+              msg += ". Trying to fail over " + formatSleepMessage(delay);
+            } else {
+              // retry
+              msg += ". Retrying " + formatSleepMessage(delay);
             }
+            LOG.info(msg, ex);
           }
 
           if (delay > 0) {
             Thread.sleep(delay);
           }
-          
+
           if (failOverAction != null) {
             // Make sure that concurrent failed method invocations only cause a
             // single actual fail over.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8f390d0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
index 171d52a..131aa8f 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -39,6 +39,8 @@ import org.apache.hadoop.ipc.StandbyException;
 import org.apache.hadoop.net.ConnectTimeoutException;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * <p>
  * A collection of useful implementations of {@link RetryPolicy}.
@@ -177,10 +179,11 @@ public class RetryPolicies {
     @Override
     public RetryAction shouldRetry(Exception e, int retries, int failovers,
         boolean isIdempotentOrAtMostOnce) throws Exception {
-      return RetryAction.FAIL;
+      return new RetryAction(RetryAction.RetryDecision.FAIL, 0, "try once " +
+          "and fail.");
     }
   }
-  
+
   static class RetryForever implements RetryPolicy {
     @Override
     public RetryAction shouldRetry(Exception e, int retries, int failovers,
@@ -221,14 +224,24 @@ public class RetryPolicies {
     public RetryAction shouldRetry(Exception e, int retries, int failovers,
         boolean isIdempotentOrAtMostOnce) throws Exception {
       if (retries >= maxRetries) {
-        return RetryAction.FAIL;
+        return new RetryAction(RetryAction.RetryDecision.FAIL, 0 , getReason());
       }
       return new RetryAction(RetryAction.RetryDecision.RETRY,
-          timeUnit.toMillis(calculateSleepTime(retries)));
+          timeUnit.toMillis(calculateSleepTime(retries)), getReason());
     }
-    
+
+    protected String getReason() {
+      return constructReasonString(maxRetries);
+    }
+
+    @VisibleForTesting
+    public static String constructReasonString(int retries) {
+      return "retries get failed due to exceeded maximum allowed retries " +
+          "number: " + retries;
+    }
+
     protected abstract long calculateSleepTime(int retries);
-    
+
     @Override
     public int hashCode() {
       return toString().hashCode();
@@ -264,18 +277,37 @@ public class RetryPolicies {
       return sleepTime;
     }
   }
-  
-  static class RetryUpToMaximumTimeWithFixedSleep extends RetryUpToMaximumCountWithFixedSleep {
-    public RetryUpToMaximumTimeWithFixedSleep(long maxTime, long sleepTime, TimeUnit timeUnit) {
+
+  static class RetryUpToMaximumTimeWithFixedSleep extends
+      RetryUpToMaximumCountWithFixedSleep {
+    private long maxTime = 0;
+    private TimeUnit timeUnit;
+
+    public RetryUpToMaximumTimeWithFixedSleep(long maxTime, long sleepTime,
+        TimeUnit timeUnit) {
       super((int) (maxTime / sleepTime), sleepTime, timeUnit);
+      this.maxTime = maxTime;
+      this.timeUnit = timeUnit;
+    }
+
+    @Override
+    protected String getReason() {
+      return constructReasonString(this.maxTime, this.timeUnit);
+    }
+
+    @VisibleForTesting
+    public static String constructReasonString(long maxTime,
+        TimeUnit timeUnit) {
+      return "retries get failed due to exceeded maximum allowed time (" +
+          "in " + timeUnit.toString() + "): " + maxTime;
     }
   }
-  
+
   static class RetryUpToMaximumCountWithProportionalSleep extends RetryLimited {
     public RetryUpToMaximumCountWithProportionalSleep(int maxRetries, long sleepTime, TimeUnit timeUnit) {
       super(maxRetries, sleepTime, timeUnit);
     }
-    
+
     @Override
     protected long calculateSleepTime(int retries) {
       return sleepTime * (retries + 1);
@@ -332,7 +364,8 @@ public class RetryPolicies {
       final Pair p = searchPair(curRetry);
       if (p == null) {
         //no more retries.
-        return RetryAction.FAIL;
+        return new RetryAction(RetryAction.RetryDecision.FAIL, 0 , "Retry " +
+            "all pairs in MultipleLinearRandomRetry: " + pairs);
       }
 
       //calculate sleep time and return.
@@ -549,6 +582,7 @@ public class RetryPolicies {
     protected long calculateSleepTime(int retries) {
       return calculateExponentialTime(sleepTime, retries + 1);
     }
+
   }
   
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8f390d0/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestRetryProxy.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestRetryProxy.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestRetryProxy.java
index 35a45b4..4137dae 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestRetryProxy.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestRetryProxy.java
@@ -28,6 +28,15 @@ import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumCountWith
 import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumTimeWithFixedSleep;
 import static org.apache.hadoop.io.retry.RetryPolicies.retryForeverWithFixedSleep;
 import static org.apache.hadoop.io.retry.RetryPolicies.exponentialBackoffRetry;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 import static org.junit.Assert.*;
 
 import java.io.IOException;
@@ -41,10 +50,19 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
+import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision;
+import org.apache.hadoop.io.retry.RetryPolicies.RetryUpToMaximumCountWithFixedSleep;
+import org.apache.hadoop.io.retry.RetryPolicies.RetryUpToMaximumTimeWithFixedSleep;
+import org.apache.hadoop.io.retry.RetryPolicies.TryOnceThenFail;
 import org.apache.hadoop.io.retry.UnreliableInterface.FatalException;
 import org.apache.hadoop.io.retry.UnreliableInterface.UnreliableException;
 import org.apache.hadoop.ipc.ProtocolTranslator;
 import org.apache.hadoop.ipc.RemoteException;
+
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
 import org.junit.Before;
 import org.junit.Test;
 
@@ -53,25 +71,57 @@ import java.lang.reflect.UndeclaredThrowableException;
 public class TestRetryProxy {
   
   private UnreliableImplementation unreliableImpl;
+  private RetryAction caughtRetryAction = null;
   
   @Before
   public void setUp() throws Exception {
     unreliableImpl = new UnreliableImplementation();
   }
 
+  // answer mockPolicy's method with realPolicy, caught method's return value
+  private void setupMockPolicy(RetryPolicy mockPolicy,
+      final RetryPolicy realPolicy) throws Exception {
+    when(mockPolicy.shouldRetry(any(Exception.class), anyInt(), anyInt(),
+        anyBoolean())).thenAnswer(new Answer<RetryAction>() {
+      @SuppressWarnings("rawtypes")
+      @Override
+      public RetryAction answer(InvocationOnMock invocation) throws Throwable {
+        Object[] args = invocation.getArguments();
+        Exception e = (Exception) args[0];
+        int retries = (int) args[1];
+        int failovers = (int) args[2];
+        boolean isIdempotentOrAtMostOnce = (boolean) args[3];
+        caughtRetryAction = realPolicy.shouldRetry(e, retries, failovers,
+            isIdempotentOrAtMostOnce);
+        return caughtRetryAction;
+      }
+    });
+  }
+
   @Test
-  public void testTryOnceThenFail() throws UnreliableException {
+  public void testTryOnceThenFail() throws Exception {
+    RetryPolicy policy = mock(TryOnceThenFail.class);
+    RetryPolicy realPolicy = TRY_ONCE_THEN_FAIL;
+    setupMockPolicy(policy, realPolicy);
+
     UnreliableInterface unreliable = (UnreliableInterface)
-      RetryProxy.create(UnreliableInterface.class, unreliableImpl, TRY_ONCE_THEN_FAIL);
+      RetryProxy.create(UnreliableInterface.class, unreliableImpl, policy);
     unreliable.alwaysSucceeds();
     try {
       unreliable.failsOnceThenSucceeds();
       fail("Should fail");
     } catch (UnreliableException e) {
       // expected
+      verify(policy, times(1)).shouldRetry(any(Exception.class), anyInt(),
+          anyInt(), anyBoolean());
+      assertEquals(RetryDecision.FAIL, caughtRetryAction.action);
+      assertEquals("try once and fail.", caughtRetryAction.reason);
+    } catch (Exception e) {
+      fail("Other exception other than UnreliableException should also get " +
+          "failed.");
     }
   }
-  
+
   /**
    * Test for {@link RetryInvocationHandler#isRpcInvocation(Object)}
    */
@@ -125,25 +175,48 @@ public class TestRetryProxy {
   }
 
   @Test
-  public void testRetryUpToMaximumCountWithFixedSleep() throws UnreliableException {
+  public void testRetryUpToMaximumCountWithFixedSleep() throws
+      Exception {
+
+    RetryPolicy policy = mock(RetryUpToMaximumCountWithFixedSleep.class);
+    int maxRetries = 8;
+    RetryPolicy realPolicy = retryUpToMaximumCountWithFixedSleep(maxRetries, 1,
+        TimeUnit.NANOSECONDS);
+    setupMockPolicy(policy, realPolicy);
+
     UnreliableInterface unreliable = (UnreliableInterface)
-      RetryProxy.create(UnreliableInterface.class, unreliableImpl,
-                        retryUpToMaximumCountWithFixedSleep(8, 1, TimeUnit.NANOSECONDS));
+      RetryProxy.create(UnreliableInterface.class, unreliableImpl, policy);
+    // shouldRetry += 1
     unreliable.alwaysSucceeds();
+    // shouldRetry += 2
     unreliable.failsOnceThenSucceeds();
     try {
+      // shouldRetry += (maxRetries -1) (just failed once above)
       unreliable.failsTenTimesThenSucceeds();
       fail("Should fail");
     } catch (UnreliableException e) {
       // expected
+      verify(policy, times(maxRetries + 2)).shouldRetry(any(Exception.class),
+          anyInt(), anyInt(), anyBoolean());
+      assertEquals(RetryDecision.FAIL, caughtRetryAction.action);
+      assertEquals(RetryUpToMaximumCountWithFixedSleep.constructReasonString(
+          maxRetries), caughtRetryAction.reason);
+    } catch (Exception e) {
+      fail("Other exception other than UnreliableException should also get " +
+          "failed.");
     }
   }
   
   @Test
-  public void testRetryUpToMaximumTimeWithFixedSleep() throws UnreliableException {
+  public void testRetryUpToMaximumTimeWithFixedSleep() throws Exception {
+    RetryPolicy policy = mock(RetryUpToMaximumTimeWithFixedSleep.class);
+    long maxTime = 80L;
+    RetryPolicy realPolicy = retryUpToMaximumTimeWithFixedSleep(maxTime, 10,
+        TimeUnit.NANOSECONDS);
+    setupMockPolicy(policy, realPolicy);
+
     UnreliableInterface unreliable = (UnreliableInterface)
-      RetryProxy.create(UnreliableInterface.class, unreliableImpl,
-                        retryUpToMaximumTimeWithFixedSleep(80, 10, TimeUnit.NANOSECONDS));
+      RetryProxy.create(UnreliableInterface.class, unreliableImpl, policy);
     unreliable.alwaysSucceeds();
     unreliable.failsOnceThenSucceeds();
     try {
@@ -151,9 +224,17 @@ public class TestRetryProxy {
       fail("Should fail");
     } catch (UnreliableException e) {
       // expected
+      verify(policy, times((int)(maxTime/10) + 2)).shouldRetry(any(Exception.class),
+          anyInt(), anyInt(), anyBoolean());
+      assertEquals(RetryDecision.FAIL, caughtRetryAction.action);
+      assertEquals(RetryUpToMaximumTimeWithFixedSleep.constructReasonString(
+          maxTime, TimeUnit.NANOSECONDS), caughtRetryAction.reason);
+    } catch (Exception e) {
+      fail("Other exception other than UnreliableException should also get " +
+          "failed.");
     }
   }
-  
+
   @Test
   public void testRetryUpToMaximumCountWithProportionalSleep() throws UnreliableException {
     UnreliableInterface unreliable = (UnreliableInterface)