You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ka...@apache.org on 2016/10/26 18:30:45 UTC
[19/50] [abbrv] hadoop git commit: YARN-5711. Propogate exceptions
back to client when using hedging RM failover provider.
YARN-5711. Propogate exceptions back to client when using hedging RM failover provider.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0a166b13
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0a166b13
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0a166b13
Branch: refs/heads/YARN-4752
Commit: 0a166b13472213db0a0cd2dfdaddb2b1746b3957
Parents: dc3272b
Author: Subru Krishnan <su...@apache.org>
Authored: Mon Oct 24 18:59:51 2016 -0700
Committer: Subru Krishnan <su...@apache.org>
Committed: Mon Oct 24 18:59:51 2016 -0700
----------------------------------------------------------------------
...stHedgingRequestRMFailoverProxyProvider.java | 31 ++++++-
.../RequestHedgingRMFailoverProxyProvider.java | 90 ++++++++++----------
2 files changed, 74 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a166b13/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestHedgingRequestRMFailoverProxyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestHedgingRequestRMFailoverProxyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestHedgingRequestRMFailoverProxyProvider.java
index 6fd6591..30b409e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestHedgingRequestRMFailoverProxyProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestHedgingRequestRMFailoverProxyProvider.java
@@ -18,16 +18,19 @@
package org.apache.hadoop.yarn.client;
+import java.io.IOException;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.HATestUtil;
import org.junit.Assert;
import org.junit.Test;
-
public class TestHedgingRequestRMFailoverProxyProvider {
@Test
@@ -63,8 +66,9 @@ public class TestHedgingRequestRMFailoverProxyProvider {
// Transition rm5 to active;
long start = System.currentTimeMillis();
makeRMActive(cluster, 4);
- // client will retry until the rm becomes active.
- client.getAllQueues();
+
+ validateActiveRM(client);
+
long end = System.currentTimeMillis();
System.out.println("Client call succeeded at " + end);
// should return the response fast
@@ -76,10 +80,29 @@ public class TestHedgingRequestRMFailoverProxyProvider {
HAServiceProtocol.RequestSource.REQUEST_BY_USER));
makeRMActive(cluster, 2);
- client.getAllQueues();
+
+ validateActiveRM(client);
+
cluster.stop();
}
+ private void validateActiveRM(YarnClient client) throws IOException {
+ // first check if exception is thrown correctly;
+ try {
+ // client will retry until the rm becomes active.
+ client.getApplicationReport(null);
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertTrue(e instanceof ApplicationNotFoundException);
+ }
+ // now make a valid call.
+ try {
+ client.getAllQueues();
+ } catch (YarnException e) {
+ Assert.fail(e.toString());
+ }
+ }
+
private void makeRMActive(final MiniYARNCluster cluster, final int index) {
Thread t = new Thread() {
@Override public void run() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a166b13/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RequestHedgingRMFailoverProxyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RequestHedgingRMFailoverProxyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RequestHedgingRMFailoverProxyProvider.java
index d076599..9468f4e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RequestHedgingRMFailoverProxyProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RequestHedgingRMFailoverProxyProvider.java
@@ -18,16 +18,6 @@
package org.apache.hadoop.yarn.client;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.retry.MultiException;
-import org.apache.hadoop.io.retry.RetryPolicy;
-import org.apache.hadoop.io.retry.RetryProxy;
-import org.apache.hadoop.util.concurrent.HadoopExecutors;
-import org.apache.hadoop.yarn.conf.HAUtil;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
@@ -39,16 +29,26 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.apache.hadoop.yarn.conf.HAUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
/**
* A FailoverProxyProvider implementation that technically does not "failover"
* per-se. It constructs a wrapper proxy that sends the request to ALL
* underlying proxies simultaneously. Each proxy inside the wrapper proxy will
- * retry the corresponding target. It assumes the in an HA setup, there will
- * be only one Active, and the active should respond faster than any configured
+ * retry the corresponding target. It assumes the in an HA setup, there will be
+ * only one Active, and the active should respond faster than any configured
* standbys. Once it receives a response from any one of the configred proxies,
* outstanding requests to other proxies are immediately cancelled.
*/
@@ -95,11 +95,11 @@ public class RequestHedgingRMFailoverProxyProvider<T>
// Create proxy that can retry exceptions properly.
RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf, false);
InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol);
- T proxy = RMProxy.<T>getProxy(conf, protocol, rmAddress);
+ T proxy = RMProxy.<T> getProxy(conf, protocol, rmAddress);
return (T) RetryProxy.create(protocol, proxy, retryPolicy);
} catch (IOException ioe) {
- LOG.error("Unable to create proxy to the ResourceManager " + HAUtil
- .getRMHAId(conf), ioe);
+ LOG.error("Unable to create proxy to the ResourceManager "
+ + HAUtil.getRMHAId(conf), ioe);
return null;
}
}
@@ -122,57 +122,61 @@ public class RequestHedgingRMFailoverProxyProvider<T>
}
}
+ private Throwable extraRootException(Exception ex) {
+ Throwable rootCause = ex;
+ if (ex instanceof ExecutionException) {
+ Throwable cause = ex.getCause();
+ if (cause instanceof InvocationTargetException) {
+ rootCause = cause.getCause();
+ }
+ }
+ return rootCause;
+ }
+
/**
* Creates a Executor and invokes all proxies concurrently.
*/
@Override
- public Object invoke(Object proxy, final Method method,
- final Object[] args) throws Throwable {
+ public Object invoke(Object proxy, final Method method, final Object[] args)
+ throws Throwable {
if (successfulProxy != null) {
- return invokeMethod(nonRetriableProxy.get(successfulProxy), method, args);
+ return invokeMethod(nonRetriableProxy.get(successfulProxy), method,
+ args);
}
ExecutorService executor = null;
CompletionService<Object> completionService;
try {
Map<Future<Object>, ProxyInfo<T>> proxyMap = new HashMap<>();
- int numAttempts = 0;
executor = HadoopExecutors.newFixedThreadPool(allProxies.size());
completionService = new ExecutorCompletionService<>(executor);
for (final ProxyInfo<T> pInfo : allProxies.values()) {
Callable<Object> c = new Callable<Object>() {
- @Override public Object call() throws Exception {
+ @Override
+ public Object call() throws Exception {
return method.invoke(pInfo.proxy, args);
}
};
proxyMap.put(completionService.submit(c), pInfo);
- numAttempts++;
}
- Map<String, Exception> badResults = new HashMap<>();
- while (numAttempts > 0) {
- Future<Object> callResultFuture = completionService.take();
- String pInfo = proxyMap.get(callResultFuture).proxyInfo;
- Object retVal;
- try {
- retVal = callResultFuture.get();
- successfulProxy = pInfo;
- LOG.info("Invocation successful on [" + pInfo + "]");
- return retVal;
- } catch (Exception ex) {
- LOG.warn("Invocation returned exception on " + "[" + pInfo + "]");
- badResults.put(pInfo, ex);
- numAttempts--;
- }
+ Future<Object> callResultFuture = completionService.take();
+ String pInfo = proxyMap.get(callResultFuture).proxyInfo;
+ successfulProxy = pInfo;
+ Object retVal;
+ try {
+ retVal = callResultFuture.get();
+ LOG.info("Invocation successful on [" + pInfo + "]");
+ return retVal;
+ } catch (Exception ex) {
+ // Throw exception from first responding RM so that clients can handle
+ // appropriately
+ Throwable rootCause = extraRootException(ex);
+ LOG.warn("Invocation returned exception: " + rootCause.toString()
+ + " on " + "[" + pInfo + "], so propagating back to caller.");
+ throw rootCause;
}
- // At this point we should have All bad results (Exceptions)
- // Or should have returned with successful result.
- if (badResults.size() == 1) {
- throw badResults.values().iterator().next();
- } else {
- throw new MultiException(badResults);
- }
} finally {
if (executor != null) {
executor.shutdownNow();
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org