You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by at...@apache.org on 2011/10/06 03:01:19 UTC

svn commit: r1179483 - in /hadoop/common/trunk/hadoop-common-project/hadoop-common: ./ src/main/java/org/apache/hadoop/io/retry/ src/test/java/org/apache/hadoop/io/retry/

Author: atm
Date: Thu Oct  6 01:01:19 2011
New Revision: 1179483

URL: http://svn.apache.org/viewvc?rev=1179483&view=rev
Log:
HADOOP-7717. Move handling of concurrent client fail-overs to RetryInvocationHandler (atm)

Modified:
    hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/UnreliableImplementation.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/UnreliableInterface.java

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1179483&r1=1179482&r2=1179483&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt Thu Oct  6 01:01:19 2011
@@ -17,15 +17,18 @@ Trunk (unreleased changes)
                  close (atm)
     
     HADOOP-7668. Add a NetUtils method that can tell if an InetAddress 
-    belongs to local host. (suresh)
+                 belongs to local host. (suresh)
 
     HADOOP-7687 Make getProtocolSignature public  (sanjay)
 
     HADOOP-7693. Enhance AvroRpcEngine to support the new #addProtocol
-    interface introduced in HADOOP-7524.  (cutting)
+                 interface introduced in HADOOP-7524.  (cutting)
 
-	HADOOP-7716 RPC protocol registration on SS does not log the protocol name
-	(only the class which may be different) (sanjay)
+	  HADOOP-7716. RPC protocol registration on SS does not log the protocol name
+	               (only the class which may be different) (sanjay)
+
+    HADOOP-7717. Move handling of concurrent client fail-overs to
+                 RetryInvocationHandler (atm)
 
   BUGS
 

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java?rev=1179483&r1=1179482&r2=1179483&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java Thu Oct  6 01:01:19 2011
@@ -24,6 +24,7 @@ import java.lang.reflect.InvocationTarge
 import java.lang.reflect.Method;
 import java.util.Collections;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -32,6 +33,11 @@ import org.apache.hadoop.io.retry.RetryP
 class RetryInvocationHandler implements InvocationHandler, Closeable {
   public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class);
   private FailoverProxyProvider proxyProvider;
+
+  /**
+   * The number of times the associated proxyProvider has ever been failed over.
+   */
+  private long proxyProviderFailoverCount = 0;
   
   private RetryPolicy defaultPolicy;
   private Map<String,RetryPolicy> methodNameToPolicyMap;
@@ -60,16 +66,24 @@ class RetryInvocationHandler implements 
       policy = defaultPolicy;
     }
     
-    int failovers = 0;
+    // The number of times this method invocation has been failed over.
+    int invocationFailoverCount = 0;
     int retries = 0;
     while (true) {
+      // The number of times this invocation handler has ever been failed over,
+      // before this method invocation attempt. Used to prevent concurrent
+      // failed method invocations from triggering multiple failover attempts.
+      long invocationAttemptFailoverCount;
+      synchronized (proxyProvider) {
+        invocationAttemptFailoverCount = proxyProviderFailoverCount;
+      }
       try {
         return invokeMethod(method, args);
       } catch (Exception e) {
         boolean isMethodIdempotent = proxyProvider.getInterface()
             .getMethod(method.getName(), method.getParameterTypes())
             .isAnnotationPresent(Idempotent.class);
-        RetryAction action = policy.shouldRetry(e, retries++, failovers,
+        RetryAction action = policy.shouldRetry(e, retries++, invocationFailoverCount,
             isMethodIdempotent);
         if (action == RetryAction.FAIL) {
           LOG.warn("Exception while invoking " + method.getName()
@@ -81,10 +95,24 @@ class RetryInvocationHandler implements 
         } else if (action == RetryAction.FAILOVER_AND_RETRY) {
           LOG.warn("Exception while invoking " + method.getName()
               + " of " + currentProxy.getClass()
-              + ". Trying to fail over.", e);
-          failovers++;
-          proxyProvider.performFailover(currentProxy);
+              + " after " + invocationFailoverCount + " fail over attempts."
+              + " Trying to fail over.", e);
+          // Make sure that concurrent failed method invocations only cause a
+          // single actual fail over.
+          synchronized (proxyProvider) {
+            if (invocationAttemptFailoverCount == proxyProviderFailoverCount) {
+              proxyProvider.performFailover(currentProxy);
+              proxyProviderFailoverCount++;
+            } else {
+              LOG.warn("A failover has occurred since the start of this method"
+                  + " invocation attempt.");
+            }
+          }
+          // The call to getProxy() could technically only be made in the event
+          // performFailover() is called, but it needs to be out here for the
+          // purpose of testing.
           currentProxy = proxyProvider.getProxy();
+          invocationFailoverCount++;
         }
         if(LOG.isDebugEnabled()) {
           LOG.debug("Exception while invoking " + method.getName()

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java?rev=1179483&r1=1179482&r2=1179483&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java Thu Oct  6 01:01:19 2011
@@ -20,6 +20,7 @@ package org.apache.hadoop.io.retry;
 import static org.junit.Assert.*;
 
 import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
 
 import org.apache.hadoop.io.retry.UnreliableImplementation.TypeOfExceptionToFailWith;
 import org.apache.hadoop.io.retry.UnreliableInterface.UnreliableException;
@@ -35,22 +36,41 @@ public class TestFailoverProxy {
     private Object impl1;
     private Object impl2;
     
+    private boolean latchEnabled = false;
+    private CountDownLatch getProxyLatch;
+    private int failoversOccurred = 0;
+    
     public FlipFlopProxyProvider(Class<?> iface, Object activeImpl,
-        Object standbyImpl) {
+        Object standbyImpl, int getProxyCountDown) {
       this.iface = iface;
       this.impl1 = activeImpl;
       this.impl2 = standbyImpl;
       currentlyActive = impl1;
+      getProxyLatch = new CountDownLatch(getProxyCountDown);
+    }
+    
+    public FlipFlopProxyProvider(Class<?> iface, Object activeImpl,
+        Object standbyImpl) {
+      this(iface, activeImpl, standbyImpl, 0);
     }
     
     @Override
     public Object getProxy() {
+      if (latchEnabled) {
+        getProxyLatch.countDown();
+        try {
+          getProxyLatch.await();
+        } catch (InterruptedException e) {
+          throw new RuntimeException(e);
+        }
+      }
       return currentlyActive;
     }
 
     @Override
-    public void performFailover(Object currentProxy) {
+    public synchronized void performFailover(Object currentProxy) {
       currentlyActive = impl1 == currentProxy ? impl2 : impl1;
+      failoversOccurred++;
     }
 
     @Override
@@ -63,6 +83,13 @@ public class TestFailoverProxy {
       // Nothing to do.
     }
     
+    public void setLatchEnabled(boolean latchEnabled) {
+      this.latchEnabled = latchEnabled;
+    }
+    
+    public int getFailoversOccurred() {
+      return failoversOccurred;
+    }
   }
   
   public static class FailOverOnceOnAnyExceptionPolicy implements RetryPolicy {
@@ -186,4 +213,55 @@ public class TestFailoverProxy {
     // IOException and this method is idempotent.
     assertEquals("impl2", unreliable.succeedsOnceThenFailsReturningStringIdempotent());
   }
-}
+  
+  private static class ConcurrentMethodThread extends Thread {
+    
+    private UnreliableInterface unreliable;
+    public String result;
+    
+    public ConcurrentMethodThread(UnreliableInterface unreliable) {
+      this.unreliable = unreliable;
+    }
+    
+    public void run() {
+      try {
+        result = unreliable.failsIfIdentifierDoesntMatch("impl2");
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  /**
+   * Test that concurrent failed method invocations only result in a single
+   * failover.
+   */
+  @Test
+  public void testConcurrentMethodFailures() throws InterruptedException {
+    FlipFlopProxyProvider proxyProvider = new FlipFlopProxyProvider(
+        UnreliableInterface.class,
+        new UnreliableImplementation("impl1",
+            TypeOfExceptionToFailWith.STANDBY_EXCEPTION),
+        new UnreliableImplementation("impl2",
+            TypeOfExceptionToFailWith.STANDBY_EXCEPTION),
+        2);
+    
+    final UnreliableInterface unreliable = (UnreliableInterface)RetryProxy
+      .create(UnreliableInterface.class, proxyProvider,
+          RetryPolicies.failoverOnNetworkException(10));
+
+    ConcurrentMethodThread t1 = new ConcurrentMethodThread(unreliable);
+    ConcurrentMethodThread t2 = new ConcurrentMethodThread(unreliable);
+    
+    // Getting a proxy will now wait on a latch.
+    proxyProvider.setLatchEnabled(true);
+    
+    t1.start();
+    t2.start();
+    t1.join();
+    t2.join();
+    assertEquals("impl2", t1.result);
+    assertEquals("impl2", t2.result);
+    assertEquals(1, proxyProvider.getFailoversOccurred());
+  }
+}
\ No newline at end of file

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/UnreliableImplementation.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/UnreliableImplementation.java?rev=1179483&r1=1179482&r2=1179483&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/UnreliableImplementation.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/UnreliableImplementation.java Thu Oct  6 01:01:19 2011
@@ -141,4 +141,23 @@ public class UnreliableImplementation im
     }
   }
 
+  @Override
+  public String failsIfIdentifierDoesntMatch(String identifier)
+      throws UnreliableException, StandbyException, IOException {
+    if (this.identifier.equals(identifier)) {
+      return identifier;
+    } else {
+      switch (exceptionToFailWith) {
+      case STANDBY_EXCEPTION:
+        throw new StandbyException(identifier);
+      case UNRELIABLE_EXCEPTION:
+        throw new UnreliableException(identifier);
+      case IO_EXCEPTION:
+        throw new IOException(identifier);
+      default:
+        throw new RuntimeException(identifier);
+      }
+    }
+  }
+
 }

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/UnreliableInterface.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/UnreliableInterface.java?rev=1179483&r1=1179482&r2=1179483&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/UnreliableInterface.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/UnreliableInterface.java Thu Oct  6 01:01:19 2011
@@ -63,4 +63,8 @@ public interface UnreliableInterface {
       throws UnreliableException, StandbyException, IOException;
   public String succeedsTenTimesThenFailsReturningString()
       throws UnreliableException, StandbyException, IOException;
+  
+  @Idempotent
+  public String failsIfIdentifierDoesntMatch(String identifier)
+      throws UnreliableException, StandbyException, IOException;
 }