You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by at...@apache.org on 2011/12/14 08:24:37 UTC

svn commit: r1214076 - in /hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common: ./ src/main/java/org/apache/hadoop/io/retry/ src/main/java/org/apache/hadoop/util/ src/test/java/org/apache/hadoop/io/retry/

Author: atm
Date: Wed Dec 14 07:24:36 2011
New Revision: 1214076

URL: http://svn.apache.org/viewvc?rev=1214076&view=rev
Log:
HADOOP-7896. HA: if both NNs are in Standby mode, client needs to try failing back and forth several times with sleeps. Contributed by Aaron T. Myers

Added:
    hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ThreadUtil.java
Modified:
    hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/CHANGES.HDFS-1623.txt
    hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
    hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
    hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicy.java
    hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java
    hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/UnreliableImplementation.java

Modified: hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/CHANGES.HDFS-1623.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/CHANGES.HDFS-1623.txt?rev=1214076&r1=1214075&r2=1214076&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/CHANGES.HDFS-1623.txt (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/CHANGES.HDFS-1623.txt Wed Dec 14 07:24:36 2011
@@ -5,4 +5,8 @@ branch is merged.
 ------------------------------
 
 HADOOP-7455. HA: Introduce HA Service Protocol Interface. (suresh)
+
 HADOOP-7774. HA: Administrative CLI to control HA daemons. (todd)
+
+HADOOP-7896. HA: if both NNs are in Standby mode, client needs to try failing
+             back and forth several times with sleeps. (atm)

Modified: hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java?rev=1214076&r1=1214075&r2=1214076&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java Wed Dec 14 07:24:36 2011
@@ -24,11 +24,11 @@ import java.lang.reflect.InvocationTarge
 import java.lang.reflect.Method;
 import java.util.Collections;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
+import org.apache.hadoop.util.ThreadUtil;
 
 class RetryInvocationHandler implements InvocationHandler, Closeable {
   public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class);
@@ -85,31 +85,38 @@ class RetryInvocationHandler implements 
             .isAnnotationPresent(Idempotent.class);
         RetryAction action = policy.shouldRetry(e, retries++, invocationFailoverCount,
             isMethodIdempotent);
-        if (action == RetryAction.FAIL) {
+        if (action.action == RetryAction.RetryDecision.FAIL) {
           LOG.warn("Exception while invoking " + method.getName()
                    + " of " + currentProxy.getClass() + ". Not retrying.", e);
           if (!method.getReturnType().equals(Void.TYPE)) {
             throw e; // non-void methods can't fail without an exception
           }
           return null;
-        } else if (action == RetryAction.FAILOVER_AND_RETRY) {
-          LOG.warn("Exception while invoking " + method.getName()
-              + " of " + currentProxy.getClass()
-              + " after " + invocationFailoverCount + " fail over attempts."
-              + " Trying to fail over.", e);
-          // Make sure that concurrent failed method invocations only cause a
-          // single actual fail over.
-          synchronized (proxyProvider) {
-            if (invocationAttemptFailoverCount == proxyProviderFailoverCount) {
-              proxyProvider.performFailover(currentProxy);
-              proxyProviderFailoverCount++;
-              currentProxy = proxyProvider.getProxy();
-            } else {
-              LOG.warn("A failover has occurred since the start of this method"
-                  + " invocation attempt.");
+        } else { // retry or failover
+          
+          if (action.delayMillis > 0) {
+            ThreadUtil.sleepAtLeastIgnoreInterrupts(action.delayMillis);
+          }
+          
+          if (action.action == RetryAction.RetryDecision.FAILOVER_AND_RETRY) {
+            LOG.warn("Exception while invoking " + method.getName()
+                + " of " + currentProxy.getClass()
+                + " after " + invocationFailoverCount + " fail over attempts."
+                + " Trying to fail over.", e);
+            // Make sure that concurrent failed method invocations only cause a
+            // single actual fail over.
+            synchronized (proxyProvider) {
+              if (invocationAttemptFailoverCount == proxyProviderFailoverCount) {
+                proxyProvider.performFailover(currentProxy);
+                proxyProviderFailoverCount++;
+                currentProxy = proxyProvider.getProxy();
+              } else {
+                LOG.warn("A failover has occurred since the start of this method"
+                    + " invocation attempt.");
+              }
             }
+            invocationFailoverCount++;
           }
-          invocationFailoverCount++;
         }
         if(LOG.isDebugEnabled()) {
           LOG.debug("Exception while invoking " + method.getName()

Modified: hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java?rev=1214076&r1=1214075&r2=1214076&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java Wed Dec 14 07:24:36 2011
@@ -33,6 +33,8 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.StandbyException;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * <p>
  * A collection of useful implementations of {@link RetryPolicy}.
@@ -42,6 +44,8 @@ public class RetryPolicies {
   
   public static final Log LOG = LogFactory.getLog(RetryPolicies.class);
   
+  private static final Random RAND = new Random();
+  
   /**
    * <p>
    * Try once, and fail by re-throwing the exception.
@@ -137,7 +141,14 @@ public class RetryPolicies {
   
   public static final RetryPolicy failoverOnNetworkException(
       RetryPolicy fallbackPolicy, int maxFailovers) {
-    return new FailoverOnNetworkExceptionRetry(fallbackPolicy, maxFailovers);
+    return failoverOnNetworkException(fallbackPolicy, maxFailovers, 0, 0);
+  }
+  
+  public static final RetryPolicy failoverOnNetworkException(
+      RetryPolicy fallbackPolicy, int maxFailovers, long delayMillis,
+      long maxDelayBase) {
+    return new FailoverOnNetworkExceptionRetry(fallbackPolicy, maxFailovers,
+        delayMillis, maxDelayBase);
   }
   
   static class TryOnceThenFail implements RetryPolicy {
@@ -176,12 +187,8 @@ public class RetryPolicies {
       if (retries >= maxRetries) {
         throw e;
       }
-      try {
-        timeUnit.sleep(calculateSleepTime(retries));
-      } catch (InterruptedException ie) {
-        // retry
-      }
-      return RetryAction.RETRY;
+      return new RetryAction(RetryAction.RetryDecision.RETRY,
+          timeUnit.toMillis(calculateSleepTime(retries)));
     }
     
     protected abstract long calculateSleepTime(int retries);
@@ -268,7 +275,7 @@ public class RetryPolicies {
   }
   
   static class ExponentialBackoffRetry extends RetryLimited {
-    private Random r = new Random();
+    
     public ExponentialBackoffRetry(
         int maxRetries, long sleepTime, TimeUnit timeUnit) {
       super(maxRetries, sleepTime, timeUnit);
@@ -276,16 +283,19 @@ public class RetryPolicies {
     
     @Override
     protected long calculateSleepTime(int retries) {
-      return sleepTime*r.nextInt(1<<(retries+1));
+      return calculateExponentialTime(sleepTime, retries + 1);
     }
   }
   
-  /*
+  /**
    * Fail over and retry in the case of:
    *   Remote StandbyException (server is up, but is not the active server)
    *   Immediate socket exceptions (e.g. no route to host, econnrefused)
    *   Socket exceptions after initial connection when operation is idempotent
    * 
+   * The first failover is immediate, while all subsequent failovers wait an
+   * exponentially-increasing random amount of time.
+   * 
    * Fail immediately in the case of:
    *   Socket exceptions after initial connection when operation is not idempotent
    * 
@@ -295,11 +305,20 @@ public class RetryPolicies {
     
     private RetryPolicy fallbackPolicy;
     private int maxFailovers;
+    private long delayMillis;
+    private long maxDelayBase;
     
     public FailoverOnNetworkExceptionRetry(RetryPolicy fallbackPolicy,
         int maxFailovers) {
+      this(fallbackPolicy, maxFailovers, 0, 0);
+    }
+    
+    public FailoverOnNetworkExceptionRetry(RetryPolicy fallbackPolicy,
+        int maxFailovers, long delayMillis, long maxDelayBase) {
       this.fallbackPolicy = fallbackPolicy;
       this.maxFailovers = maxFailovers;
+      this.delayMillis = delayMillis;
+      this.maxDelayBase = maxDelayBase;
     }
 
     @Override
@@ -314,8 +333,13 @@ public class RetryPolicies {
       if (e instanceof ConnectException ||
           e instanceof NoRouteToHostException ||
           e instanceof UnknownHostException ||
-          e instanceof StandbyException) {
-        return RetryAction.FAILOVER_AND_RETRY;
+          e instanceof StandbyException ||
+          isWrappedStandbyException(e)) {
+        return new RetryAction(
+            RetryAction.RetryDecision.FAILOVER_AND_RETRY,
+            // retry immediately if this is our first failover, sleep otherwise
+            failovers == 0 ? 0 :
+                calculateExponentialTime(delayMillis, failovers, maxDelayBase));
       } else if (e instanceof SocketException ||
                  e instanceof IOException) {
         if (isMethodIdempotent) {
@@ -330,4 +354,34 @@ public class RetryPolicies {
     }
     
   }
+
+  /**
+   * Return a value which is <code>time</code> increasing exponentially as a
+   * function of <code>retries</code>, +/- 0%-50% of that value, chosen
+   * randomly.
+   * 
+   * @param time the base amount of time to work with
+   * @param retries the number of retries that have so occurred so far
+   * @param cap value at which to cap the base sleep time
+   * @return an amount of time to sleep
+   */
+  @VisibleForTesting
+  public static long calculateExponentialTime(long time, int retries,
+      long cap) {
+    long baseTime = Math.min(time * ((long)1 << retries), cap);
+    return (long) (baseTime * (RAND.nextFloat() + 0.5));
+  }
+
+  private static long calculateExponentialTime(long time, int retries) {
+    return calculateExponentialTime(time, retries, Long.MAX_VALUE);
+  }
+  
+  private static boolean isWrappedStandbyException(Exception e) {
+    if (!(e instanceof RemoteException)) {
+      return false;
+    }
+    Exception unwrapped = ((RemoteException)e).unwrapRemoteException(
+        StandbyException.class);
+    return unwrapped instanceof StandbyException;
+  }
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicy.java?rev=1214076&r1=1214075&r2=1214076&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicy.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicy.java Wed Dec 14 07:24:36 2011
@@ -19,7 +19,6 @@ package org.apache.hadoop.io.retry;
 
 import org.apache.hadoop.classification.InterfaceStability;
 
-
 /**
  * <p>
  * Specifies a policy for retrying method failures.
@@ -33,10 +32,33 @@ public interface RetryPolicy {
    * Returned by {@link RetryPolicy#shouldRetry(Exception, int, int, boolean)}.
    */
   @InterfaceStability.Evolving
-  public enum RetryAction {
-    FAIL,
-    RETRY,
-    FAILOVER_AND_RETRY
+  public static class RetryAction {
+    
+    // A few common retry policies, with no delays.
+    public static final RetryAction FAIL =
+        new RetryAction(RetryDecision.FAIL);
+    public static final RetryAction RETRY =
+        new RetryAction(RetryDecision.RETRY);
+    public static final RetryAction FAILOVER_AND_RETRY =
+        new RetryAction(RetryDecision.FAILOVER_AND_RETRY);
+    
+    public final RetryDecision action;
+    public final long delayMillis;
+    
+    public RetryAction(RetryDecision action) {
+      this(action, 0);
+    }
+    
+    public RetryAction(RetryDecision action, long delayTime) {
+      this.action = action;
+      this.delayMillis = delayTime;
+    }
+    
+    public enum RetryDecision {
+      FAIL,
+      RETRY,
+      FAILOVER_AND_RETRY
+    }
   }
   
   /**

Added: hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ThreadUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ThreadUtil.java?rev=1214076&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ThreadUtil.java (added)
+++ hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ThreadUtil.java Wed Dec 14 07:24:36 2011
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceStability.Evolving
+public class ThreadUtil {
+  
+  private static final Log LOG = LogFactory.getLog(ThreadUtil.class);
+
+  /**
+   * Cause the current thread to sleep as close as possible to the provided
+   * number of milliseconds. This method will log and ignore any
+   * {@link InterrupedException} encountered.
+   * 
+   * @param millis the number of milliseconds for the current thread to sleep
+   */
+  public static void sleepAtLeastIgnoreInterrupts(long millis) {
+    long start = System.currentTimeMillis();
+    while (System.currentTimeMillis() - start < millis) {
+      long timeToSleep = millis -
+          (System.currentTimeMillis() - start);
+      try {
+        Thread.sleep(timeToSleep);
+      } catch (InterruptedException ie) {
+        LOG.warn("interrupted while sleeping", ie);
+      }
+    }
+  }
+}

Modified: hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java?rev=1214076&r1=1214075&r2=1214076&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java Wed Dec 14 07:24:36 2011
@@ -25,6 +25,7 @@ import java.util.concurrent.CountDownLat
 import org.apache.hadoop.io.retry.UnreliableImplementation.TypeOfExceptionToFailWith;
 import org.apache.hadoop.io.retry.UnreliableInterface.UnreliableException;
 import org.apache.hadoop.ipc.StandbyException;
+import org.apache.hadoop.util.ThreadUtil;
 import org.junit.Test;
 
 public class TestFailoverProxy {
@@ -267,4 +268,40 @@ public class TestFailoverProxy {
     assertEquals("impl2", t2.result);
     assertEquals(1, proxyProvider.getFailoversOccurred());
   }
+
+  /**
+   * Ensure that when all configured services are throwing StandbyException
+   * that we fail over back and forth between them until one is no longer
+   * throwing StandbyException.
+   */
+  @Test
+  public void testFailoverBetweenMultipleStandbys()
+      throws UnreliableException, StandbyException, IOException {
+    
+    final long millisToSleep = 10000;
+    
+    final UnreliableImplementation impl1 = new UnreliableImplementation("impl1",
+        TypeOfExceptionToFailWith.STANDBY_EXCEPTION);
+    FlipFlopProxyProvider proxyProvider = new FlipFlopProxyProvider(
+        UnreliableInterface.class,
+        impl1,
+        new UnreliableImplementation("impl2",
+            TypeOfExceptionToFailWith.STANDBY_EXCEPTION));
+    
+    final UnreliableInterface unreliable = (UnreliableInterface)RetryProxy
+      .create(UnreliableInterface.class, proxyProvider,
+          RetryPolicies.failoverOnNetworkException(
+              RetryPolicies.TRY_ONCE_THEN_FAIL, 10, 1000, 10000));
+    
+    new Thread() {
+      @Override
+      public void run() {
+        ThreadUtil.sleepAtLeastIgnoreInterrupts(millisToSleep);
+        impl1.setIdentifier("renamed-impl1");
+      }
+    }.start();
+    
+    String result = unreliable.failsIfIdentifierDoesntMatch("renamed-impl1");
+    assertEquals("renamed-impl1", result);
+  }
 }
\ No newline at end of file

Modified: hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/UnreliableImplementation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/UnreliableImplementation.java?rev=1214076&r1=1214075&r2=1214076&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/UnreliableImplementation.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/UnreliableImplementation.java Wed Dec 14 07:24:36 2011
@@ -48,6 +48,10 @@ public class UnreliableImplementation im
     this(identifier, TypeOfExceptionToFailWith.UNRELIABLE_EXCEPTION);
   }
   
+  public void setIdentifier(String identifier) {
+    this.identifier = identifier;
+  }
+  
   public UnreliableImplementation(String identifier,
       TypeOfExceptionToFailWith exceptionToFailWith) {
     this.identifier = identifier;
@@ -147,15 +151,17 @@ public class UnreliableImplementation im
     if (this.identifier.equals(identifier)) {
       return identifier;
     } else {
+      String message = "expected '" + this.identifier + "' but received '" +
+          identifier + "'";
       switch (exceptionToFailWith) {
       case STANDBY_EXCEPTION:
-        throw new StandbyException(identifier);
+        throw new StandbyException(message);
       case UNRELIABLE_EXCEPTION:
-        throw new UnreliableException(identifier);
+        throw new UnreliableException(message);
       case IO_EXCEPTION:
-        throw new IOException(identifier);
+        throw new IOException(message);
       default:
-        throw new RuntimeException(identifier);
+        throw new RuntimeException(message);
       }
     }
   }