You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2015/07/28 08:02:39 UTC
hadoop git commit: HDFS-7858. Improve HA Namenode Failover detection
on the client. (asuresh)
Repository: hadoop
Updated Branches:
refs/heads/trunk e21dde501 -> 030fcfa99
HDFS-7858. Improve HA Namenode Failover detection on the client. (asuresh)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/030fcfa9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/030fcfa9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/030fcfa9
Branch: refs/heads/trunk
Commit: 030fcfa99c345ad57625486eeabedebf2fd4411f
Parents: e21dde5
Author: Arun Suresh <as...@apache.org>
Authored: Mon Jul 27 23:02:03 2015 -0700
Committer: Arun Suresh <as...@apache.org>
Committed: Mon Jul 27 23:02:03 2015 -0700
----------------------------------------------------------------------
.../apache/hadoop/io/retry/MultiException.java | 49 +++
.../hadoop/io/retry/RetryInvocationHandler.java | 99 +++++-
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 +
.../ha/ConfiguredFailoverProxyProvider.java | 52 ++-
.../ha/RequestHedgingProxyProvider.java | 186 ++++++++++
.../markdown/HDFSHighAvailabilityWithNFS.md | 9 +-
.../markdown/HDFSHighAvailabilityWithQJM.md | 10 +-
.../ha/TestRequestHedgingProxyProvider.java | 350 +++++++++++++++++++
8 files changed, 724 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/030fcfa9/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/MultiException.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/MultiException.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/MultiException.java
new file mode 100644
index 0000000..4963a2d
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/MultiException.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.io.retry;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Holder class that clients can use to return multiple exceptions.
+ */
+public class MultiException extends IOException {
+
+ private final Map<String, Exception> exes;
+
+ public MultiException(Map<String, Exception> exes) {
+ this.exes = exes;
+ }
+
+ public Map<String, Exception> getExceptions() {
+ return exes;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("{");
+ for (Exception e : exes.values()) {
+ sb.append(e.toString()).append(", ");
+ }
+ sb.append("}");
+ return "MultiException[" + sb.toString() + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/030fcfa9/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
index 543567e..9256356 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
@@ -23,6 +23,8 @@ import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
@@ -101,7 +103,7 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
Object ret = invokeMethod(method, args);
hasMadeASuccessfulCall = true;
return ret;
- } catch (Exception e) {
+ } catch (Exception ex) {
boolean isIdempotentOrAtMostOnce = proxyProvider.getInterface()
.getMethod(method.getName(), method.getParameterTypes())
.isAnnotationPresent(Idempotent.class);
@@ -110,15 +112,16 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
.getMethod(method.getName(), method.getParameterTypes())
.isAnnotationPresent(AtMostOnce.class);
}
- RetryAction action = policy.shouldRetry(e, retries++,
- invocationFailoverCount, isIdempotentOrAtMostOnce);
- if (action.action == RetryAction.RetryDecision.FAIL) {
- if (action.reason != null) {
+ List<RetryAction> actions = extractActions(policy, ex, retries++,
+ invocationFailoverCount, isIdempotentOrAtMostOnce);
+ RetryAction failAction = getFailAction(actions);
+ if (failAction != null) {
+ if (failAction.reason != null) {
LOG.warn("Exception while invoking " + currentProxy.proxy.getClass()
+ "." + method.getName() + " over " + currentProxy.proxyInfo
- + ". Not retrying because " + action.reason, e);
+ + ". Not retrying because " + failAction.reason, ex);
}
- throw e;
+ throw ex;
} else { // retry or failover
// avoid logging the failover if this is the first call on this
// proxy object, and we successfully achieve the failover without
@@ -126,8 +129,9 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
boolean worthLogging =
!(invocationFailoverCount == 0 && !hasMadeASuccessfulCall);
worthLogging |= LOG.isDebugEnabled();
- if (action.action == RetryAction.RetryDecision.FAILOVER_AND_RETRY &&
- worthLogging) {
+ RetryAction failOverAction = getFailOverAction(actions);
+ long delay = getDelayMillis(actions);
+ if (failOverAction != null && worthLogging) {
String msg = "Exception while invoking " + method.getName()
+ " of class " + currentProxy.proxy.getClass().getSimpleName()
+ " over " + currentProxy.proxyInfo;
@@ -135,22 +139,22 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
if (invocationFailoverCount > 0) {
msg += " after " + invocationFailoverCount + " fail over attempts";
}
- msg += ". Trying to fail over " + formatSleepMessage(action.delayMillis);
- LOG.info(msg, e);
+ msg += ". Trying to fail over " + formatSleepMessage(delay);
+ LOG.info(msg, ex);
} else {
if(LOG.isDebugEnabled()) {
LOG.debug("Exception while invoking " + method.getName()
+ " of class " + currentProxy.proxy.getClass().getSimpleName()
+ " over " + currentProxy.proxyInfo + ". Retrying "
- + formatSleepMessage(action.delayMillis), e);
+ + formatSleepMessage(delay), ex);
}
}
-
- if (action.delayMillis > 0) {
- Thread.sleep(action.delayMillis);
+
+ if (delay > 0) {
+ Thread.sleep(delay);
}
- if (action.action == RetryAction.RetryDecision.FAILOVER_AND_RETRY) {
+ if (failOverAction != null) {
// Make sure that concurrent failed method invocations only cause a
// single actual fail over.
synchronized (proxyProvider) {
@@ -169,7 +173,68 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
}
}
}
-
+
+ /**
+ * Obtain a retry delay from list of RetryActions.
+ */
+ private long getDelayMillis(List<RetryAction> actions) {
+ long retVal = 0;
+ for (RetryAction action : actions) {
+ if (action.action == RetryAction.RetryDecision.FAILOVER_AND_RETRY ||
+ action.action == RetryAction.RetryDecision.RETRY) {
+ if (action.delayMillis > retVal) {
+ retVal = action.delayMillis;
+ }
+ }
+ }
+ return retVal;
+ }
+
+ /**
+ * Return the first FAILOVER_AND_RETRY action.
+ */
+ private RetryAction getFailOverAction(List<RetryAction> actions) {
+ for (RetryAction action : actions) {
+ if (action.action == RetryAction.RetryDecision.FAILOVER_AND_RETRY) {
+ return action;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Return the last FAIL action.. only if there are no RETRY actions.
+ */
+ private RetryAction getFailAction(List<RetryAction> actions) {
+ RetryAction fAction = null;
+ for (RetryAction action : actions) {
+ if (action.action == RetryAction.RetryDecision.FAIL) {
+ fAction = action;
+ } else {
+ // Atleast 1 RETRY
+ return null;
+ }
+ }
+ return fAction;
+ }
+
+ private List<RetryAction> extractActions(RetryPolicy policy, Exception ex,
+ int i, int invocationFailoverCount,
+ boolean isIdempotentOrAtMostOnce)
+ throws Exception {
+ List<RetryAction> actions = new LinkedList<>();
+ if (ex instanceof MultiException) {
+ for (Exception th : ((MultiException) ex).getExceptions().values()) {
+ actions.add(policy.shouldRetry(th, i, invocationFailoverCount,
+ isIdempotentOrAtMostOnce));
+ }
+ } else {
+ actions.add(policy.shouldRetry(ex, i,
+ invocationFailoverCount, isIdempotentOrAtMostOnce));
+ }
+ return actions;
+ }
+
private static String formatSleepMessage(long millis) {
if (millis > 0) {
return "after sleeping for " + millis + "ms.";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/030fcfa9/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index cc2a833..9b2de81 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -753,6 +753,8 @@ Release 2.8.0 - UNRELEASED
HDFS-8735. Inotify: All events classes should implement toString() API.
(Surendra Singh Lilhore via aajisaka)
+ HDFS-7858. Improve HA Namenode Failover detection on the client. (asuresh)
+
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
http://git-wip-us.apache.org/repos/asf/hadoop/blob/030fcfa9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java
index 235c886..ccce736 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -38,6 +39,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.UserGroupInformation;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/**
@@ -51,16 +53,40 @@ public class ConfiguredFailoverProxyProvider<T> extends
private static final Log LOG =
LogFactory.getLog(ConfiguredFailoverProxyProvider.class);
- private final Configuration conf;
- private final List<AddressRpcProxyPair<T>> proxies =
+ interface ProxyFactory<T> {
+ T createProxy(Configuration conf, InetSocketAddress nnAddr, Class<T> xface,
+ UserGroupInformation ugi, boolean withRetries,
+ AtomicBoolean fallbackToSimpleAuth) throws IOException;
+ }
+
+ static class DefaultProxyFactory<T> implements ProxyFactory<T> {
+ @Override
+ public T createProxy(Configuration conf, InetSocketAddress nnAddr,
+ Class<T> xface, UserGroupInformation ugi, boolean withRetries,
+ AtomicBoolean fallbackToSimpleAuth) throws IOException {
+ return NameNodeProxies.createNonHAProxy(conf,
+ nnAddr, xface, ugi, false, fallbackToSimpleAuth).getProxy();
+ }
+ }
+
+ protected final Configuration conf;
+ protected final List<AddressRpcProxyPair<T>> proxies =
new ArrayList<AddressRpcProxyPair<T>>();
private final UserGroupInformation ugi;
- private final Class<T> xface;
-
+ protected final Class<T> xface;
+
private int currentProxyIndex = 0;
+ private final ProxyFactory<T> factory;
public ConfiguredFailoverProxyProvider(Configuration conf, URI uri,
Class<T> xface) {
+ this(conf, uri, xface, new DefaultProxyFactory<T>());
+ }
+
+ @VisibleForTesting
+ ConfiguredFailoverProxyProvider(Configuration conf, URI uri,
+ Class<T> xface, ProxyFactory<T> factory) {
+
Preconditions.checkArgument(
xface.isAssignableFrom(NamenodeProtocols.class),
"Interface class %s is not a valid NameNode protocol!");
@@ -78,9 +104,10 @@ public class ConfiguredFailoverProxyProvider<T> extends
HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT);
this.conf.setInt(
- CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
- maxRetriesOnSocketTimeouts);
-
+ CommonConfigurationKeysPublic
+ .IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
+ maxRetriesOnSocketTimeouts);
+
try {
ugi = UserGroupInformation.getCurrentUser();
@@ -102,6 +129,7 @@ public class ConfiguredFailoverProxyProvider<T> extends
// URI of the cluster. Clone this token to apply to each of the
// underlying IPC addresses so that the IPC code can find it.
HAUtil.cloneDelegationTokenForLogicalUri(ugi, uri, addressesOfNns);
+ this.factory = factory;
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -120,8 +148,8 @@ public class ConfiguredFailoverProxyProvider<T> extends
AddressRpcProxyPair<T> current = proxies.get(currentProxyIndex);
if (current.namenode == null) {
try {
- current.namenode = NameNodeProxies.createNonHAProxy(conf,
- current.address, xface, ugi, false, fallbackToSimpleAuth).getProxy();
+ current.namenode = factory.createProxy(conf,
+ current.address, xface, ugi, false, fallbackToSimpleAuth);
} catch (IOException e) {
LOG.error("Failed to create RPC proxy to NameNode", e);
throw new RuntimeException(e);
@@ -131,7 +159,11 @@ public class ConfiguredFailoverProxyProvider<T> extends
}
@Override
- public synchronized void performFailover(T currentProxy) {
+ public void performFailover(T currentProxy) {
+ incrementProxyIndex();
+ }
+
+ synchronized void incrementProxyIndex() {
currentProxyIndex = (currentProxyIndex + 1) % proxies.size();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/030fcfa9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java
new file mode 100644
index 0000000..b7216b0
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.ha;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.io.retry.MultiException;
+
+/**
+ * A FailoverProxyProvider implementation that technically does not "failover"
+ * per-se. It constructs a wrapper proxy that sends the request to ALL
+ * underlying proxies simultaneously. It assumes the in an HA setup, there will
+ * be only one Active, and the active should respond faster than any configured
+ * standbys. Once it recieve a response from any one of the configred proxies,
+ * outstanding requests to other proxies are immediately cancelled.
+ */
+public class RequestHedgingProxyProvider<T> extends
+ ConfiguredFailoverProxyProvider<T> {
+
+ private static final Log LOG =
+ LogFactory.getLog(RequestHedgingProxyProvider.class);
+
+ class RequestHedgingInvocationHandler implements InvocationHandler {
+
+ final Map<String, ProxyInfo<T>> targetProxies;
+
+ public RequestHedgingInvocationHandler(
+ Map<String, ProxyInfo<T>> targetProxies) {
+ this.targetProxies = new HashMap<>(targetProxies);
+ }
+
+ /**
+ * Creates a Executor and invokes all proxies concurrently. This
+ * implementation assumes that Clients have configured proper socket
+ * timeouts, else the call can block forever.
+ *
+ * @param proxy
+ * @param method
+ * @param args
+ * @return
+ * @throws Throwable
+ */
+ @Override
+ public Object
+ invoke(Object proxy, final Method method, final Object[] args)
+ throws Throwable {
+ Map<Future<Object>, ProxyInfo<T>> proxyMap = new HashMap<>();
+ int numAttempts = 0;
+
+ ExecutorService executor = null;
+ CompletionService<Object> completionService;
+ try {
+ // Optimization : if only 2 proxies are configured and one had failed
+ // over, then we dont need to create a threadpool etc.
+ targetProxies.remove(toIgnore);
+ if (targetProxies.size() == 1) {
+ ProxyInfo<T> proxyInfo = targetProxies.values().iterator().next();
+ Object retVal = method.invoke(proxyInfo.proxy, args);
+ successfulProxy = proxyInfo;
+ return retVal;
+ }
+ executor = Executors.newFixedThreadPool(proxies.size());
+ completionService = new ExecutorCompletionService<>(executor);
+ for (final Map.Entry<String, ProxyInfo<T>> pEntry :
+ targetProxies.entrySet()) {
+ Callable<Object> c = new Callable<Object>() {
+ @Override
+ public Object call() throws Exception {
+ return method.invoke(pEntry.getValue().proxy, args);
+ }
+ };
+ proxyMap.put(completionService.submit(c), pEntry.getValue());
+ numAttempts++;
+ }
+
+ Map<String, Exception> badResults = new HashMap<>();
+ while (numAttempts > 0) {
+ Future<Object> callResultFuture = completionService.take();
+ Object retVal;
+ try {
+ retVal = callResultFuture.get();
+ successfulProxy = proxyMap.get(callResultFuture);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Invocation successful on ["
+ + successfulProxy.proxyInfo + "]");
+ }
+ return retVal;
+ } catch (Exception ex) {
+ ProxyInfo<T> tProxyInfo = proxyMap.get(callResultFuture);
+ LOG.warn("Invocation returned exception on "
+ + "[" + tProxyInfo.proxyInfo + "]");
+ badResults.put(tProxyInfo.proxyInfo, ex);
+ numAttempts--;
+ }
+ }
+
+ // At this point we should have All bad results (Exceptions)
+ // Or should have returned with successful result.
+ if (badResults.size() == 1) {
+ throw badResults.values().iterator().next();
+ } else {
+ throw new MultiException(badResults);
+ }
+ } finally {
+ if (executor != null) {
+ executor.shutdownNow();
+ }
+ }
+ }
+ }
+
+
+ private volatile ProxyInfo<T> successfulProxy = null;
+ private volatile String toIgnore = null;
+
+ public RequestHedgingProxyProvider(
+ Configuration conf, URI uri, Class<T> xface) {
+ this(conf, uri, xface, new DefaultProxyFactory<T>());
+ }
+
+ @VisibleForTesting
+ RequestHedgingProxyProvider(Configuration conf, URI uri,
+ Class<T> xface, ProxyFactory<T> factory) {
+ super(conf, uri, xface, factory);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public synchronized ProxyInfo<T> getProxy() {
+ if (successfulProxy != null) {
+ return successfulProxy;
+ }
+ Map<String, ProxyInfo<T>> targetProxyInfos = new HashMap<>();
+ StringBuilder combinedInfo = new StringBuilder('[');
+ for (int i = 0; i < proxies.size(); i++) {
+ ProxyInfo<T> pInfo = super.getProxy();
+ incrementProxyIndex();
+ targetProxyInfos.put(pInfo.proxyInfo, pInfo);
+ combinedInfo.append(pInfo.proxyInfo).append(',');
+ }
+ combinedInfo.append(']');
+ T wrappedProxy = (T) Proxy.newProxyInstance(
+ RequestHedgingInvocationHandler.class.getClassLoader(),
+ new Class<?>[]{xface},
+ new RequestHedgingInvocationHandler(targetProxyInfos));
+ return new ProxyInfo<T>(wrappedProxy, combinedInfo.toString());
+ }
+
+ @Override
+ public synchronized void performFailover(T currentProxy) {
+ toIgnore = successfulProxy.proxyInfo;
+ successfulProxy = null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/030fcfa9/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSHighAvailabilityWithNFS.md
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSHighAvailabilityWithNFS.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSHighAvailabilityWithNFS.md
index cc53a38..51a88c9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSHighAvailabilityWithNFS.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSHighAvailabilityWithNFS.md
@@ -195,9 +195,12 @@ The order in which you set these configurations is unimportant, but the values y
Configure the name of the Java class which will be used by the DFS Client to
determine which NameNode is the current Active, and therefore which NameNode is
- currently serving client requests. The only implementation which currently
- ships with Hadoop is the **ConfiguredFailoverProxyProvider**, so use this
- unless you are using a custom one. For example:
+ currently serving client requests. The two implementations which currently
+ ship with Hadoop are the **ConfiguredFailoverProxyProvider** and the
+ **RequestHedgingProxyProvider** (which, for the first call, concurrently invokes all
+ namenodes to determine the active one, and on subsequent requests, invokes the active
+ namenode until a fail-over happens), so use one of these unless you are using a custom
+ proxy provider.
<property>
<name>dfs.client.failover.proxy.provider.mycluster</name>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/030fcfa9/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSHighAvailabilityWithQJM.md
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSHighAvailabilityWithQJM.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSHighAvailabilityWithQJM.md
index d9d9a67..8b42386 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSHighAvailabilityWithQJM.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSHighAvailabilityWithQJM.md
@@ -216,9 +216,13 @@ The order in which you set these configurations is unimportant, but the values y
Configure the name of the Java class which will be used by the DFS Client to
determine which NameNode is the current Active, and therefore which NameNode is
- currently serving client requests. The only implementation which currently
- ships with Hadoop is the **ConfiguredFailoverProxyProvider**, so use this
- unless you are using a custom one. For example:
+ currently serving client requests. The two implementations which currently
+ ship with Hadoop are the **ConfiguredFailoverProxyProvider** and the
+ **RequestHedgingProxyProvider** (which, for the first call, concurrently invokes all
+ namenodes to determine the active one, and on subsequent requests, invokes the active
+ namenode until a fail-over happens), so use one of these unless you are using a custom
+ proxy provider.
+ For example:
<property>
<name>dfs.client.failover.proxy.provider.mycluster</name>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/030fcfa9/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java
new file mode 100644
index 0000000..32f807a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java
@@ -0,0 +1,350 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.ha;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.ProxyFactory;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.io.retry.MultiException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Time;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import com.google.common.collect.Lists;
+
+public class TestRequestHedgingProxyProvider {
+
+ private Configuration conf;
+ private URI nnUri;
+ private String ns;
+
+ @Before
+ public void setup() throws URISyntaxException {
+ ns = "mycluster-" + Time.monotonicNow();
+ nnUri = new URI("hdfs://" + ns);
+ conf = new Configuration();
+ conf.set(DFSConfigKeys.DFS_NAMESERVICES, ns);
+ conf.set(
+ DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns, "nn1,nn2");
+ conf.set(
+ DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns + ".nn1",
+ "machine1.foo.bar:8020");
+ conf.set(
+ DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns + ".nn2",
+ "machine2.foo.bar:8020");
+ }
+
+ @Test
+ public void testHedgingWhenOneFails() throws Exception {
+ final NamenodeProtocols goodMock = Mockito.mock(NamenodeProtocols.class);
+ Mockito.when(goodMock.getStats()).thenReturn(new long[] {1});
+ final NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class);
+ Mockito.when(badMock.getStats()).thenThrow(new IOException("Bad mock !!"));
+
+ RequestHedgingProxyProvider<NamenodeProtocols> provider =
+ new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class,
+ createFactory(goodMock, badMock));
+ long[] stats = provider.getProxy().proxy.getStats();
+ Assert.assertTrue(stats.length == 1);
+ Mockito.verify(badMock).getStats();
+ Mockito.verify(goodMock).getStats();
+ }
+
+ @Test
+ public void testHedgingWhenOneIsSlow() throws Exception {
+ final NamenodeProtocols goodMock = Mockito.mock(NamenodeProtocols.class);
+ Mockito.when(goodMock.getStats()).thenAnswer(new Answer<long[]>() {
+ @Override
+ public long[] answer(InvocationOnMock invocation) throws Throwable {
+ Thread.sleep(1000);
+ return new long[]{1};
+ }
+ });
+ final NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class);
+ Mockito.when(badMock.getStats()).thenThrow(new IOException("Bad mock !!"));
+
+ RequestHedgingProxyProvider<NamenodeProtocols> provider =
+ new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class,
+ createFactory(goodMock, badMock));
+ long[] stats = provider.getProxy().proxy.getStats();
+ Assert.assertTrue(stats.length == 1);
+ Assert.assertEquals(1, stats[0]);
+ Mockito.verify(badMock).getStats();
+ Mockito.verify(goodMock).getStats();
+ }
+
+ @Test
+ public void testHedgingWhenBothFail() throws Exception {
+ NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class);
+ Mockito.when(badMock.getStats()).thenThrow(new IOException("Bad mock !!"));
+ NamenodeProtocols worseMock = Mockito.mock(NamenodeProtocols.class);
+ Mockito.when(worseMock.getStats()).thenThrow(
+ new IOException("Worse mock !!"));
+
+ RequestHedgingProxyProvider<NamenodeProtocols> provider =
+ new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class,
+ createFactory(badMock, worseMock));
+ try {
+ provider.getProxy().proxy.getStats();
+ Assert.fail("Should fail since both namenodes throw IOException !!");
+ } catch (Exception e) {
+ Assert.assertTrue(e instanceof MultiException);
+ }
+ Mockito.verify(badMock).getStats();
+ Mockito.verify(worseMock).getStats();
+ }
+
+ @Test
+ public void testPerformFailover() throws Exception {
+ final AtomicInteger counter = new AtomicInteger(0);
+ final int[] isGood = {1};
+ final NamenodeProtocols goodMock = Mockito.mock(NamenodeProtocols.class);
+ Mockito.when(goodMock.getStats()).thenAnswer(new Answer<long[]>() {
+ @Override
+ public long[] answer(InvocationOnMock invocation) throws Throwable {
+ counter.incrementAndGet();
+ if (isGood[0] == 1) {
+ Thread.sleep(1000);
+ return new long[]{1};
+ }
+ throw new IOException("Was Good mock !!");
+ }
+ });
+ final NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class);
+ Mockito.when(badMock.getStats()).thenAnswer(new Answer<long[]>() {
+ @Override
+ public long[] answer(InvocationOnMock invocation) throws Throwable {
+ counter.incrementAndGet();
+ if (isGood[0] == 2) {
+ Thread.sleep(1000);
+ return new long[]{2};
+ }
+ throw new IOException("Bad mock !!");
+ }
+ });
+ RequestHedgingProxyProvider<NamenodeProtocols> provider =
+ new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class,
+ createFactory(goodMock, badMock));
+ long[] stats = provider.getProxy().proxy.getStats();
+ Assert.assertTrue(stats.length == 1);
+ Assert.assertEquals(1, stats[0]);
+ Assert.assertEquals(2, counter.get());
+ Mockito.verify(badMock).getStats();
+ Mockito.verify(goodMock).getStats();
+
+ stats = provider.getProxy().proxy.getStats();
+ Assert.assertTrue(stats.length == 1);
+ Assert.assertEquals(1, stats[0]);
+ // Ensure only the previous successful one is invoked
+ Mockito.verifyNoMoreInteractions(badMock);
+ Assert.assertEquals(3, counter.get());
+
+ // Flip to standby.. so now this should fail
+ isGood[0] = 2;
+ try {
+ provider.getProxy().proxy.getStats();
+ Assert.fail("Should fail since previously successful proxy now fails ");
+ } catch (Exception ex) {
+ Assert.assertTrue(ex instanceof IOException);
+ }
+
+ Assert.assertEquals(4, counter.get());
+
+ provider.performFailover(provider.getProxy().proxy);
+ stats = provider.getProxy().proxy.getStats();
+ Assert.assertTrue(stats.length == 1);
+ Assert.assertEquals(2, stats[0]);
+
+ // Counter shuodl update only once
+ Assert.assertEquals(5, counter.get());
+
+ stats = provider.getProxy().proxy.getStats();
+ Assert.assertTrue(stats.length == 1);
+ Assert.assertEquals(2, stats[0]);
+
+ // Counter updates only once now
+ Assert.assertEquals(6, counter.get());
+
+ // Flip back to old active.. so now this should fail
+ isGood[0] = 1;
+ try {
+ provider.getProxy().proxy.getStats();
+ Assert.fail("Should fail since previously successful proxy now fails ");
+ } catch (Exception ex) {
+ Assert.assertTrue(ex instanceof IOException);
+ }
+
+ Assert.assertEquals(7, counter.get());
+
+ provider.performFailover(provider.getProxy().proxy);
+ stats = provider.getProxy().proxy.getStats();
+ Assert.assertTrue(stats.length == 1);
+ // Ensure correct proxy was called
+ Assert.assertEquals(1, stats[0]);
+ }
+
+ @Test
+ public void testPerformFailoverWith3Proxies() throws Exception {
+ conf.set(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns,
+ "nn1,nn2,nn3");
+ conf.set(DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns + ".nn3",
+ "machine3.foo.bar:8020");
+
+ final AtomicInteger counter = new AtomicInteger(0);
+ final int[] isGood = {1};
+ final NamenodeProtocols goodMock = Mockito.mock(NamenodeProtocols.class);
+ Mockito.when(goodMock.getStats()).thenAnswer(new Answer<long[]>() {
+ @Override
+ public long[] answer(InvocationOnMock invocation) throws Throwable {
+ counter.incrementAndGet();
+ if (isGood[0] == 1) {
+ Thread.sleep(1000);
+ return new long[]{1};
+ }
+ throw new IOException("Was Good mock !!");
+ }
+ });
+ final NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class);
+ Mockito.when(badMock.getStats()).thenAnswer(new Answer<long[]>() {
+ @Override
+ public long[] answer(InvocationOnMock invocation) throws Throwable {
+ counter.incrementAndGet();
+ if (isGood[0] == 2) {
+ Thread.sleep(1000);
+ return new long[]{2};
+ }
+ throw new IOException("Bad mock !!");
+ }
+ });
+ final NamenodeProtocols worseMock = Mockito.mock(NamenodeProtocols.class);
+ Mockito.when(worseMock.getStats()).thenAnswer(new Answer<long[]>() {
+ @Override
+ public long[] answer(InvocationOnMock invocation) throws Throwable {
+ counter.incrementAndGet();
+ if (isGood[0] == 3) {
+ Thread.sleep(1000);
+ return new long[]{3};
+ }
+ throw new IOException("Worse mock !!");
+ }
+ });
+
+ RequestHedgingProxyProvider<NamenodeProtocols> provider =
+ new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class,
+ createFactory(goodMock, badMock, worseMock));
+ long[] stats = provider.getProxy().proxy.getStats();
+ Assert.assertTrue(stats.length == 1);
+ Assert.assertEquals(1, stats[0]);
+ Assert.assertEquals(3, counter.get());
+ Mockito.verify(badMock).getStats();
+ Mockito.verify(goodMock).getStats();
+ Mockito.verify(worseMock).getStats();
+
+ stats = provider.getProxy().proxy.getStats();
+ Assert.assertTrue(stats.length == 1);
+ Assert.assertEquals(1, stats[0]);
+ // Ensure only the previous successful one is invoked
+ Mockito.verifyNoMoreInteractions(badMock);
+ Mockito.verifyNoMoreInteractions(worseMock);
+ Assert.assertEquals(4, counter.get());
+
+ // Flip to standby.. so now this should fail
+ isGood[0] = 2;
+ try {
+ provider.getProxy().proxy.getStats();
+ Assert.fail("Should fail since previously successful proxy now fails ");
+ } catch (Exception ex) {
+ Assert.assertTrue(ex instanceof IOException);
+ }
+
+ Assert.assertEquals(5, counter.get());
+
+ provider.performFailover(provider.getProxy().proxy);
+ stats = provider.getProxy().proxy.getStats();
+ Assert.assertTrue(stats.length == 1);
+ Assert.assertEquals(2, stats[0]);
+
+ // Counter updates twice since both proxies are tried on failure
+ Assert.assertEquals(7, counter.get());
+
+ stats = provider.getProxy().proxy.getStats();
+ Assert.assertTrue(stats.length == 1);
+ Assert.assertEquals(2, stats[0]);
+
+ // Counter updates only once now
+ Assert.assertEquals(8, counter.get());
+
+ // Flip to Other standby.. so now this should fail
+ isGood[0] = 3;
+ try {
+ provider.getProxy().proxy.getStats();
+ Assert.fail("Should fail since previously successful proxy now fails ");
+ } catch (Exception ex) {
+ Assert.assertTrue(ex instanceof IOException);
+ }
+
+ // Counter should ipdate only 1 time
+ Assert.assertEquals(9, counter.get());
+
+ provider.performFailover(provider.getProxy().proxy);
+ stats = provider.getProxy().proxy.getStats();
+ Assert.assertTrue(stats.length == 1);
+
+ // Ensure correct proxy was called
+ Assert.assertEquals(3, stats[0]);
+
+ // Counter updates twice since both proxies are tried on failure
+ Assert.assertEquals(11, counter.get());
+
+ stats = provider.getProxy().proxy.getStats();
+ Assert.assertTrue(stats.length == 1);
+ Assert.assertEquals(3, stats[0]);
+
+ // Counter updates only once now
+ Assert.assertEquals(12, counter.get());
+ }
+
+ private ProxyFactory<NamenodeProtocols> createFactory(
+ NamenodeProtocols... protos) {
+ final Iterator<NamenodeProtocols> iterator =
+ Lists.newArrayList(protos).iterator();
+ return new ProxyFactory<NamenodeProtocols>() {
+ @Override
+ public NamenodeProtocols createProxy(Configuration conf,
+ InetSocketAddress nnAddr, Class<NamenodeProtocols> xface,
+ UserGroupInformation ugi, boolean withRetries,
+ AtomicBoolean fallbackToSimpleAuth) throws IOException {
+ return iterator.next();
+ }
+ };
+ }
+}