You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2012/03/07 21:04:01 UTC
svn commit: r1298080 - in
/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common: ./
src/main/java/org/apache/hadoop/io/retry/
src/test/java/org/apache/hadoop/io/retry/
Author: suresh
Date: Wed Mar 7 20:04:00 2012
New Revision: 1298080
URL: http://svn.apache.org/viewvc?rev=1298080&view=rev
Log:
HADOOP-7717. Merge r1179483 from trunk to 0.23
Modified:
hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt
hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java
hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/UnreliableImplementation.java
hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/UnreliableInterface.java
Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1298080&r1=1298079&r2=1298080&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt Wed Mar 7 20:04:00 2012
@@ -32,6 +32,9 @@ Release 0.23.3 - UNRELEASED
HADOOP-7776. Make the Ipc-Header in a RPC-Payload an explicit header.
(sanjay)
+ HADOOP-7717. Move handling of concurrent client fail-overs to
+ RetryInvocationHandler (atm)
+
HADOOP-7862. Move the support for multiple protocols to lower layer so
that Writable, PB and Avro can all use it (Sanjay)
Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java?rev=1298080&r1=1298079&r2=1298080&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java Wed Mar 7 20:04:00 2012
@@ -23,6 +23,7 @@ import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Collections;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -33,6 +34,11 @@ import org.apache.hadoop.ipc.RpcInvocati
class RetryInvocationHandler implements RpcInvocationHandler {
public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class);
private FailoverProxyProvider proxyProvider;
+
+ /**
+ * The number of times the associated proxyProvider has ever been failed over.
+ */
+ private long proxyProviderFailoverCount = 0;
private RetryPolicy defaultPolicy;
private Map<String,RetryPolicy> methodNameToPolicyMap;
@@ -61,16 +67,24 @@ class RetryInvocationHandler implements
policy = defaultPolicy;
}
- int failovers = 0;
+ // The number of times this method invocation has been failed over.
+ int invocationFailoverCount = 0;
int retries = 0;
while (true) {
+ // The number of times this invocation handler has ever been failed over,
+ // before this method invocation attempt. Used to prevent concurrent
+ // failed method invocations from triggering multiple failover attempts.
+ long invocationAttemptFailoverCount;
+ synchronized (proxyProvider) {
+ invocationAttemptFailoverCount = proxyProviderFailoverCount;
+ }
try {
return invokeMethod(method, args);
} catch (Exception e) {
boolean isMethodIdempotent = proxyProvider.getInterface()
.getMethod(method.getName(), method.getParameterTypes())
.isAnnotationPresent(Idempotent.class);
- RetryAction action = policy.shouldRetry(e, retries++, failovers,
+ RetryAction action = policy.shouldRetry(e, retries++, invocationFailoverCount,
isMethodIdempotent);
if (action == RetryAction.FAIL) {
LOG.warn("Exception while invoking " + method.getName()
@@ -82,10 +96,24 @@ class RetryInvocationHandler implements
} else if (action == RetryAction.FAILOVER_AND_RETRY) {
LOG.warn("Exception while invoking " + method.getName()
+ " of " + currentProxy.getClass()
- + ". Trying to fail over.", e);
- failovers++;
- proxyProvider.performFailover(currentProxy);
+ + " after " + invocationFailoverCount + " fail over attempts."
+ + " Trying to fail over.", e);
+ // Make sure that concurrent failed method invocations only cause a
+ // single actual fail over.
+ synchronized (proxyProvider) {
+ if (invocationAttemptFailoverCount == proxyProviderFailoverCount) {
+ proxyProvider.performFailover(currentProxy);
+ proxyProviderFailoverCount++;
+ } else {
+ LOG.warn("A failover has occurred since the start of this method"
+ + " invocation attempt.");
+ }
+ }
+ // The call to getProxy() could technically only be made in the event
+ // performFailover() is called, but it needs to be out here for the
+ // purpose of testing.
currentProxy = proxyProvider.getProxy();
+ invocationFailoverCount++;
}
if(LOG.isDebugEnabled()) {
LOG.debug("Exception while invoking " + method.getName()
Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java?rev=1298080&r1=1298079&r2=1298080&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java Wed Mar 7 20:04:00 2012
@@ -20,6 +20,7 @@ package org.apache.hadoop.io.retry;
import static org.junit.Assert.*;
import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.io.retry.UnreliableImplementation.TypeOfExceptionToFailWith;
import org.apache.hadoop.io.retry.UnreliableInterface.UnreliableException;
@@ -35,22 +36,41 @@ public class TestFailoverProxy {
private Object impl1;
private Object impl2;
+ private boolean latchEnabled = false;
+ private CountDownLatch getProxyLatch;
+ private int failoversOccurred = 0;
+
public FlipFlopProxyProvider(Class<?> iface, Object activeImpl,
- Object standbyImpl) {
+ Object standbyImpl, int getProxyCountDown) {
this.iface = iface;
this.impl1 = activeImpl;
this.impl2 = standbyImpl;
currentlyActive = impl1;
+ getProxyLatch = new CountDownLatch(getProxyCountDown);
+ }
+
+ public FlipFlopProxyProvider(Class<?> iface, Object activeImpl,
+ Object standbyImpl) {
+ this(iface, activeImpl, standbyImpl, 0);
}
@Override
public Object getProxy() {
+ if (latchEnabled) {
+ getProxyLatch.countDown();
+ try {
+ getProxyLatch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
return currentlyActive;
}
@Override
- public void performFailover(Object currentProxy) {
+ public synchronized void performFailover(Object currentProxy) {
currentlyActive = impl1 == currentProxy ? impl2 : impl1;
+ failoversOccurred++;
}
@Override
@@ -63,6 +83,13 @@ public class TestFailoverProxy {
// Nothing to do.
}
+ public void setLatchEnabled(boolean latchEnabled) {
+ this.latchEnabled = latchEnabled;
+ }
+
+ public int getFailoversOccurred() {
+ return failoversOccurred;
+ }
}
public static class FailOverOnceOnAnyExceptionPolicy implements RetryPolicy {
@@ -186,4 +213,55 @@ public class TestFailoverProxy {
// IOException and this method is idempotent.
assertEquals("impl2", unreliable.succeedsOnceThenFailsReturningStringIdempotent());
}
-}
+
+ private static class ConcurrentMethodThread extends Thread {
+
+ private UnreliableInterface unreliable;
+ public String result;
+
+ public ConcurrentMethodThread(UnreliableInterface unreliable) {
+ this.unreliable = unreliable;
+ }
+
+ public void run() {
+ try {
+ result = unreliable.failsIfIdentifierDoesntMatch("impl2");
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ /**
+ * Test that concurrent failed method invocations only result in a single
+ * failover.
+ */
+ @Test
+ public void testConcurrentMethodFailures() throws InterruptedException {
+ FlipFlopProxyProvider proxyProvider = new FlipFlopProxyProvider(
+ UnreliableInterface.class,
+ new UnreliableImplementation("impl1",
+ TypeOfExceptionToFailWith.STANDBY_EXCEPTION),
+ new UnreliableImplementation("impl2",
+ TypeOfExceptionToFailWith.STANDBY_EXCEPTION),
+ 2);
+
+ final UnreliableInterface unreliable = (UnreliableInterface)RetryProxy
+ .create(UnreliableInterface.class, proxyProvider,
+ RetryPolicies.failoverOnNetworkException(10));
+
+ ConcurrentMethodThread t1 = new ConcurrentMethodThread(unreliable);
+ ConcurrentMethodThread t2 = new ConcurrentMethodThread(unreliable);
+
+ // Getting a proxy will now wait on a latch.
+ proxyProvider.setLatchEnabled(true);
+
+ t1.start();
+ t2.start();
+ t1.join();
+ t2.join();
+ assertEquals("impl2", t1.result);
+ assertEquals("impl2", t2.result);
+ assertEquals(1, proxyProvider.getFailoversOccurred());
+ }
+}
\ No newline at end of file
Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/UnreliableImplementation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/UnreliableImplementation.java?rev=1298080&r1=1298079&r2=1298080&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/UnreliableImplementation.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/UnreliableImplementation.java Wed Mar 7 20:04:00 2012
@@ -141,4 +141,23 @@ public class UnreliableImplementation im
}
}
+ @Override
+ public String failsIfIdentifierDoesntMatch(String identifier)
+ throws UnreliableException, StandbyException, IOException {
+ if (this.identifier.equals(identifier)) {
+ return identifier;
+ } else {
+ switch (exceptionToFailWith) {
+ case STANDBY_EXCEPTION:
+ throw new StandbyException(identifier);
+ case UNRELIABLE_EXCEPTION:
+ throw new UnreliableException(identifier);
+ case IO_EXCEPTION:
+ throw new IOException(identifier);
+ default:
+ throw new RuntimeException(identifier);
+ }
+ }
+ }
+
}
Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/UnreliableInterface.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/UnreliableInterface.java?rev=1298080&r1=1298079&r2=1298080&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/UnreliableInterface.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/UnreliableInterface.java Wed Mar 7 20:04:00 2012
@@ -63,4 +63,8 @@ public interface UnreliableInterface {
throws UnreliableException, StandbyException, IOException;
public String succeedsTenTimesThenFailsReturningString()
throws UnreliableException, StandbyException, IOException;
+
+ @Idempotent
+ public String failsIfIdentifierDoesntMatch(String identifier)
+ throws UnreliableException, StandbyException, IOException;
}