You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ka...@apache.org on 2016/10/26 18:30:45 UTC

[19/50] [abbrv] hadoop git commit: YARN-5711. Propogate exceptions back to client when using hedging RM failover provider.

YARN-5711. Propogate exceptions back to client when using hedging RM failover provider.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0a166b13
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0a166b13
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0a166b13

Branch: refs/heads/YARN-4752
Commit: 0a166b13472213db0a0cd2dfdaddb2b1746b3957
Parents: dc3272b
Author: Subru Krishnan <su...@apache.org>
Authored: Mon Oct 24 18:59:51 2016 -0700
Committer: Subru Krishnan <su...@apache.org>
Committed: Mon Oct 24 18:59:51 2016 -0700

----------------------------------------------------------------------
 ...stHedgingRequestRMFailoverProxyProvider.java | 31 ++++++-
 .../RequestHedgingRMFailoverProxyProvider.java  | 90 ++++++++++----------
 2 files changed, 74 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a166b13/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestHedgingRequestRMFailoverProxyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestHedgingRequestRMFailoverProxyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestHedgingRequestRMFailoverProxyProvider.java
index 6fd6591..30b409e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestHedgingRequestRMFailoverProxyProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestHedgingRequestRMFailoverProxyProvider.java
@@ -18,16 +18,19 @@
 
 package org.apache.hadoop.yarn.client;
 
+import java.io.IOException;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
 import org.apache.hadoop.yarn.server.resourcemanager.HATestUtil;
 import org.junit.Assert;
 import org.junit.Test;
 
-
 public class TestHedgingRequestRMFailoverProxyProvider {
 
   @Test
@@ -63,8 +66,9 @@ public class TestHedgingRequestRMFailoverProxyProvider {
     // Transition rm5 to active;
     long start = System.currentTimeMillis();
     makeRMActive(cluster, 4);
-    // client will retry until the rm becomes active.
-    client.getAllQueues();
+
+    validateActiveRM(client);
+
     long end = System.currentTimeMillis();
     System.out.println("Client call succeeded at " + end);
     // should return the response fast
@@ -76,10 +80,29 @@ public class TestHedgingRequestRMFailoverProxyProvider {
             HAServiceProtocol.RequestSource.REQUEST_BY_USER));
 
     makeRMActive(cluster, 2);
-    client.getAllQueues();
+
+    validateActiveRM(client);
+
     cluster.stop();
   }
 
+  private void validateActiveRM(YarnClient client) throws IOException {
+    // first check if exception is thrown correctly;
+    try {
+      // client will retry until the rm becomes active.
+      client.getApplicationReport(null);
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertTrue(e instanceof ApplicationNotFoundException);
+    }
+    // now make a valid call.
+    try {
+      client.getAllQueues();
+    } catch (YarnException e) {
+      Assert.fail(e.toString());
+    }
+  }
+
   private void makeRMActive(final MiniYARNCluster cluster, final int index) {
     Thread t = new Thread() {
       @Override public void run() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a166b13/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RequestHedgingRMFailoverProxyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RequestHedgingRMFailoverProxyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RequestHedgingRMFailoverProxyProvider.java
index d076599..9468f4e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RequestHedgingRMFailoverProxyProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RequestHedgingRMFailoverProxyProvider.java
@@ -18,16 +18,6 @@
 
 package org.apache.hadoop.yarn.client;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.retry.MultiException;
-import org.apache.hadoop.io.retry.RetryPolicy;
-import org.apache.hadoop.io.retry.RetryProxy;
-import org.apache.hadoop.util.concurrent.HadoopExecutors;
-import org.apache.hadoop.yarn.conf.HAUtil;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-
 import java.io.IOException;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.InvocationTargetException;
@@ -39,16 +29,26 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.apache.hadoop.yarn.conf.HAUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
 /**
  * A FailoverProxyProvider implementation that technically does not "failover"
  * per-se. It constructs a wrapper proxy that sends the request to ALL
  * underlying proxies simultaneously. Each proxy inside the wrapper proxy will
- * retry the corresponding target. It assumes the in an HA setup, there will
- * be only one Active, and the active should respond faster than any configured
+ * retry the corresponding target. It assumes the in an HA setup, there will be
+ * only one Active, and the active should respond faster than any configured
  * standbys. Once it receives a response from any one of the configred proxies,
  * outstanding requests to other proxies are immediately cancelled.
  */
@@ -95,11 +95,11 @@ public class RequestHedgingRMFailoverProxyProvider<T>
       // Create proxy that can retry exceptions properly.
       RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf, false);
       InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol);
-      T proxy = RMProxy.<T>getProxy(conf, protocol, rmAddress);
+      T proxy = RMProxy.<T> getProxy(conf, protocol, rmAddress);
       return (T) RetryProxy.create(protocol, proxy, retryPolicy);
     } catch (IOException ioe) {
-      LOG.error("Unable to create proxy to the ResourceManager " + HAUtil
-          .getRMHAId(conf), ioe);
+      LOG.error("Unable to create proxy to the ResourceManager "
+          + HAUtil.getRMHAId(conf), ioe);
       return null;
     }
   }
@@ -122,57 +122,61 @@ public class RequestHedgingRMFailoverProxyProvider<T>
       }
     }
 
+    private Throwable extraRootException(Exception ex) {
+      Throwable rootCause = ex;
+      if (ex instanceof ExecutionException) {
+        Throwable cause = ex.getCause();
+        if (cause instanceof InvocationTargetException) {
+          rootCause = cause.getCause();
+        }
+      }
+      return rootCause;
+    }
+
     /**
      * Creates a Executor and invokes all proxies concurrently.
      */
     @Override
-    public Object invoke(Object proxy, final Method method,
-        final Object[] args) throws Throwable {
+    public Object invoke(Object proxy, final Method method, final Object[] args)
+        throws Throwable {
       if (successfulProxy != null) {
-        return invokeMethod(nonRetriableProxy.get(successfulProxy), method, args);
+        return invokeMethod(nonRetriableProxy.get(successfulProxy), method,
+            args);
       }
 
       ExecutorService executor = null;
       CompletionService<Object> completionService;
       try {
         Map<Future<Object>, ProxyInfo<T>> proxyMap = new HashMap<>();
-        int numAttempts = 0;
         executor = HadoopExecutors.newFixedThreadPool(allProxies.size());
         completionService = new ExecutorCompletionService<>(executor);
         for (final ProxyInfo<T> pInfo : allProxies.values()) {
           Callable<Object> c = new Callable<Object>() {
-            @Override public Object call() throws Exception {
+            @Override
+            public Object call() throws Exception {
               return method.invoke(pInfo.proxy, args);
             }
           };
           proxyMap.put(completionService.submit(c), pInfo);
-          numAttempts++;
         }
 
-        Map<String, Exception> badResults = new HashMap<>();
-        while (numAttempts > 0) {
-          Future<Object> callResultFuture = completionService.take();
-          String pInfo = proxyMap.get(callResultFuture).proxyInfo;
-          Object retVal;
-          try {
-            retVal = callResultFuture.get();
-            successfulProxy = pInfo;
-            LOG.info("Invocation successful on [" + pInfo + "]");
-            return retVal;
-          } catch (Exception ex) {
-            LOG.warn("Invocation returned exception on " + "[" + pInfo + "]");
-            badResults.put(pInfo, ex);
-            numAttempts--;
-          }
+        Future<Object> callResultFuture = completionService.take();
+        String pInfo = proxyMap.get(callResultFuture).proxyInfo;
+        successfulProxy = pInfo;
+        Object retVal;
+        try {
+          retVal = callResultFuture.get();
+          LOG.info("Invocation successful on [" + pInfo + "]");
+          return retVal;
+        } catch (Exception ex) {
+          // Throw exception from first responding RM so that clients can handle
+          // appropriately
+          Throwable rootCause = extraRootException(ex);
+          LOG.warn("Invocation returned exception: " + rootCause.toString()
+              + " on " + "[" + pInfo + "], so propagating back to caller.");
+          throw rootCause;
         }
 
-        // At this point we should have All bad results (Exceptions)
-        // Or should have returned with successful result.
-        if (badResults.size() == 1) {
-          throw badResults.values().iterator().next();
-        } else {
-          throw new MultiException(badResults);
-        }
       } finally {
         if (executor != null) {
           executor.shutdownNow();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org