You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ka...@apache.org on 2016/01/08 00:08:35 UTC

hadoop git commit: YARN-4180. AMLauncher does not retry on failures when talking to NM. (adhoot)

Repository: hadoop
Updated Branches:
  refs/heads/branch-2.6 79da1283d -> 6b2abb751


YARN-4180. AMLauncher does not retry on failures when talking to NM. (adhoot)

(cherry picked from commit 9735afe967a660f356e953348cb6c34417f41055)

Conflicts:
	hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java

(cherry picked from commit 22f2501476d987afb7bc19080a7a0db94ea72be6)

Conflicts:
	hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
	hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java

(cherry picked from commit 7c9a368b45b0e38173521a94ab32dee8a2984bf8)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6b2abb75
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6b2abb75
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6b2abb75

Branch: refs/heads/branch-2.6
Commit: 6b2abb7515b62abfbf34e0a512f33750209db27d
Parents: 79da128
Author: Anubhav Dhoot <ad...@apache.org>
Authored: Mon Sep 28 15:30:17 2015 -0700
Committer: Karthik Kambatla <ka...@cloudera.com>
Committed: Thu Jan 7 15:08:05 2016 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  4 ++
 .../resourcemanager/amlauncher/AMLauncher.java  | 23 +++----
 .../yarn/server/resourcemanager/MockRM.java     | 13 ++--
 .../TestApplicationMasterLauncher.java          | 71 +++++++++++++++++++-
 4 files changed, 92 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b2abb75/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index fe921b5..e46f2f7 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -40,6 +40,10 @@ Release 2.6.4 - UNRELEASED
     YARN-4380. TestResourceLocalizationService.testDownloadingResourcesOnContainerKill
     fails intermittently. (Varun Saxena via ozawa)
 
+    YARN-4180. AMLauncher does not retry on failures when talking to NM.
+    (adhoot)
+
+
 Release 2.6.3 - 2015-12-17
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b2abb75/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
index 0dd9ba1..f5ecbaa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.amlauncher;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
-import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -50,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.client.NMProxy;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
@@ -150,10 +150,10 @@ public class AMLauncher implements Runnable {
       final ContainerId containerId) {
 
     final NodeId node = masterContainer.getNodeId();
-    final InetSocketAddress containerManagerBindAddress =
+    final InetSocketAddress containerManagerConnectAddress =
         NetUtils.createSocketAddrForHost(node.getHost(), node.getPort());
 
-    final YarnRPC rpc = YarnRPC.create(conf); // TODO: Don't create again and again.
+    final YarnRPC rpc = getYarnRPC();
 
     UserGroupInformation currentUser =
         UserGroupInformation.createRemoteUser(containerId
@@ -167,18 +167,15 @@ public class AMLauncher implements Runnable {
         rmContext.getNMTokenSecretManager().createNMToken(
             containerId.getApplicationAttemptId(), node, user);
     currentUser.addToken(ConverterUtils.convertFromYarn(token,
-        containerManagerBindAddress));
+        containerManagerConnectAddress));
 
-    return currentUser
-        .doAs(new PrivilegedAction<ContainerManagementProtocol>() {
+    return NMProxy.createNMProxy(conf, ContainerManagementProtocol.class,
+        currentUser, rpc, containerManagerConnectAddress);
+  }
 
-          @Override
-          public ContainerManagementProtocol run() {
-            return (ContainerManagementProtocol) rpc.getProxy(
-                ContainerManagementProtocol.class,
-                containerManagerBindAddress, conf);
-          }
-        });
+  @VisibleForTesting
+  protected YarnRPC getYarnRPC() {
+    return YarnRPC.create(conf);  // TODO: Don't create again and again.
   }
 
   private ContainerLaunchContext createAMContainerLaunchContext(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b2abb75/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index 5794b43..0415502 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -134,15 +134,20 @@ public class MockRM extends ResourceManager {
     Assert.assertEquals("App state is not correct (timedout)", finalState,
         app.getState());
   }
-  
-  public void waitForState(ApplicationAttemptId attemptId, 
-                           RMAppAttemptState finalState)
+
+  public void waitForState(ApplicationAttemptId attemptId,
+      RMAppAttemptState finalState)
       throws Exception {
+    waitForState(attemptId, finalState, 40000);
+  }
+
+  public void waitForState(ApplicationAttemptId attemptId,
+      RMAppAttemptState finalState, int timeoutMsecs) throws Exception {
     RMApp app = getRMContext().getRMApps().get(attemptId.getApplicationId());
     Assert.assertNotNull("app shouldn't be null", app);
     RMAppAttempt attempt = app.getRMAppAttempt(attemptId);
     int timeoutSecs = 0;
-    while (!finalState.equals(attempt.getAppAttemptState()) && timeoutSecs++ < 40) {
+    while (!finalState.equals(attempt.getAppAttemptState()) && timeoutSecs++ < timeoutMsecs) {
       System.out.println("AppAttempt : " + attemptId 
           + " State is : " + attempt.getAppAttemptState()
           + " Waiting for state : " + finalState);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b2abb75/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
index 11cd1fd..e54caa4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -26,6 +27,7 @@ import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@@ -43,11 +45,18 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.SerializedException;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
 import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
+import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher;
+import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@@ -58,6 +67,10 @@ import org.apache.log4j.Logger;
 import org.junit.Assert;
 import org.junit.Test;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 public class TestApplicationMasterLauncher {
 
   private static final Log LOG = LogFactory
@@ -177,8 +190,62 @@ public class TestApplicationMasterLauncher {
     am.waitForState(RMAppAttemptState.FINISHED);
     rm.stop();
   }
-  
-    
+
+  @Test
+  public void testRetriesOnFailures() throws Exception {
+    final ContainerManagementProtocol mockProxy =
+        mock(ContainerManagementProtocol.class);
+    final StartContainersResponse mockResponse =
+        mock(StartContainersResponse.class);
+    when(mockProxy.startContainers(any(StartContainersRequest.class)))
+        .thenThrow(new NMNotYetReadyException("foo")).thenReturn(mockResponse);
+    Configuration conf = new Configuration();
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
+    conf.setInt(YarnConfiguration.CLIENT_NM_CONNECT_RETRY_INTERVAL_MS, 1);
+    final DrainDispatcher dispatcher = new DrainDispatcher();
+    MockRM rm = new MockRMWithCustomAMLauncher(conf, null) {
+      @Override
+      protected ApplicationMasterLauncher createAMLauncher() {
+        return new ApplicationMasterLauncher(getRMContext()) {
+          @Override
+          protected Runnable createRunnableLauncher(RMAppAttempt application,
+              AMLauncherEventType event) {
+            return new AMLauncher(context, application, event, getConfig()) {
+              @Override
+              protected YarnRPC getYarnRPC() {
+                YarnRPC mockRpc = mock(YarnRPC.class);
+
+                when(mockRpc.getProxy(
+                    any(Class.class),
+                    any(InetSocketAddress.class),
+                    any(Configuration.class)))
+                    .thenReturn(mockProxy);
+                return mockRpc;
+              }
+            };
+          }
+        };
+      }
+
+      @Override
+      protected Dispatcher createDispatcher() {
+        return dispatcher;
+      }
+    };
+    rm.start();
+    MockNM nm1 = rm.registerNode("127.0.0.1:1234", 5120);
+
+    RMApp app = rm.submitApp(2000);
+    final ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+        .getAppAttemptId();
+
+    // kick the scheduling
+    nm1.nodeHeartbeat(true);
+    dispatcher.await();
+
+    rm.waitForState(appAttemptId, RMAppAttemptState.LAUNCHED, 500);
+  }
+
   @SuppressWarnings("unused")
   @Test(timeout = 100000)
   public void testallocateBeforeAMRegistration() throws Exception {