You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/05/19 23:22:00 UTC

hadoop git commit: YARN-2821. Fixed a problem that DistributedShell AM may hang if restarted. Contributed by Varun Vasudev (cherry picked from commit 7438966586f1896ab3e8b067d47a4af28a894106)

Repository: hadoop
Updated Branches:
  refs/heads/branch-2 7af9a78fe -> adb90c7f5


YARN-2821. Fixed a problem that DistributedShell AM may hang if restarted. Contributed by Varun Vasudev
(cherry picked from commit 7438966586f1896ab3e8b067d47a4af28a894106)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/adb90c7f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/adb90c7f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/adb90c7f

Branch: refs/heads/branch-2
Commit: adb90c7f52be4c443a1050b2bfcbcb5cdf8542f5
Parents: 7af9a78
Author: Jian He <ji...@apache.org>
Authored: Tue May 19 14:20:31 2015 -0700
Committer: Jian He <ji...@apache.org>
Committed: Tue May 19 14:21:48 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../pom.xml                                     |   5 +
 .../distributedshell/ApplicationMaster.java     |  54 +++++++-
 .../distributedshell/TestDSAppMaster.java       | 130 +++++++++++++++++++
 4 files changed, 187 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/adb90c7f/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index c97df93..16cb27b 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -375,6 +375,9 @@ Release 2.8.0 - UNRELEASED
     YARN-3302. TestDockerContainerExecutor should run automatically if it can
     detect docker in the usual place (Ravindra Kumar Naik via raviprak)
 
+    YARN-2821. Fixed a problem that DistributedShell AM may hang if restarted.
+    (Varun Vasudev via jianhe)
+
 Release 2.7.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/adb90c7f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml
index 24f8bcc..6ac8bf1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml
@@ -116,6 +116,11 @@
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/adb90c7f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
index b62c24c..b28c0c9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
@@ -30,10 +30,12 @@ import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.Vector;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -277,6 +279,10 @@ public class ApplicationMaster {
   private final String linux_bash_command = "bash";
   private final String windows_command = "cmd /c";
 
+  @VisibleForTesting
+  protected final Set<ContainerId> launchedContainers =
+      Collections.newSetFromMap(new ConcurrentHashMap<ContainerId, Boolean>());
+
   /**
    * @param args Command line args
    */
@@ -601,8 +607,12 @@ public class ApplicationMaster {
         response.getContainersFromPreviousAttempts();
     LOG.info(appAttemptID + " received " + previousAMRunningContainers.size()
       + " previous attempts' running containers on AM registration.");
+    for(Container container: previousAMRunningContainers) {
+      launchedContainers.add(container.getId());
+    }
     numAllocatedContainers.addAndGet(previousAMRunningContainers.size());
 
+
     int numTotalContainersToRequest =
         numTotalContainers - previousAMRunningContainers.size();
     // Setup ask for containers from RM
@@ -715,8 +725,9 @@ public class ApplicationMaster {
 
     return success;
   }
-  
-  private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
+
+  @VisibleForTesting
+  class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
     @SuppressWarnings("unchecked")
     @Override
     public void onContainersCompleted(List<ContainerStatus> completedContainers) {
@@ -731,6 +742,14 @@ public class ApplicationMaster {
 
         // non complete containers should not be here
         assert (containerStatus.getState() == ContainerState.COMPLETE);
+        // ignore containers we know nothing about - probably from a previous
+        // attempt
+        if (!launchedContainers.contains(containerStatus.getContainerId())) {
+          LOG.info("Ignoring completed status of "
+              + containerStatus.getContainerId()
+              + "; unknown container(probably launched by previous attempt)");
+          continue;
+        }
 
         // increment counters for completed/failed containers
         int exitStatus = containerStatus.getExitStatus();
@@ -796,14 +815,13 @@ public class ApplicationMaster {
         // + ", containerToken"
         // +allocatedContainer.getContainerToken().getIdentifier().toString());
 
-        LaunchContainerRunnable runnableLaunchContainer =
-            new LaunchContainerRunnable(allocatedContainer, containerListener);
-        Thread launchThread = new Thread(runnableLaunchContainer);
+        Thread launchThread = createLaunchContainerThread(allocatedContainer);
 
         // launch and start the container on a separate thread to keep
         // the main thread unblocked
         // as all containers may not be allocated at one go.
         launchThreads.add(launchThread);
+        launchedContainers.add(allocatedContainer.getId());
         launchThread.start();
       }
     }
@@ -1150,4 +1168,30 @@ public class ApplicationMaster {
           + appAttemptId.toString(), e);
     }
   }
+
+  RMCallbackHandler getRMCallbackHandler() {
+    return new RMCallbackHandler();
+  }
+
+  @VisibleForTesting
+  void setAmRMClient(AMRMClientAsync client) {
+    this.amRMClient = client;
+  }
+
+  @VisibleForTesting
+  int getNumCompletedContainers() {
+    return numCompletedContainers.get();
+  }
+
+  @VisibleForTesting
+  boolean getDone() {
+    return done;
+  }
+
+  @VisibleForTesting
+  Thread createLaunchContainerThread(Container allocatedContainer) {
+    LaunchContainerRunnable runnableLaunchContainer =
+        new LaunchContainerRunnable(allocatedContainer, containerListener);
+    return new Thread(runnableLaunchContainer);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/adb90c7f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java
index 11e840a..0fed14d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java
@@ -20,13 +20,143 @@ package org.apache.hadoop.yarn.applications.distributedshell;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A bunch of tests to make sure that the container allocations
+ * and releases occur correctly.
+ */
 public class TestDSAppMaster {
 
+  static class TestAppMaster extends ApplicationMaster {
+    private int threadsLaunched = 0;
+
+    @Override
+    protected Thread createLaunchContainerThread(Container allocatedContainer) {
+      threadsLaunched++;
+      launchedContainers.add(allocatedContainer.getId());
+      return new Thread();
+    }
+
+    void setNumTotalContainers(int numTotalContainers) {
+      this.numTotalContainers = numTotalContainers;
+    }
+
+    int getAllocatedContainers() {
+      return this.numAllocatedContainers.get();
+    }
+
+    @Override
+    void startTimelineClient(final Configuration conf) throws YarnException,
+        IOException, InterruptedException {
+      timelineClient = null;
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testDSAppMasterAllocateHandler() throws Exception {
+
+    TestAppMaster master = new TestAppMaster();
+    int targetContainers = 2;
+    AMRMClientAsync mockClient = Mockito.mock(AMRMClientAsync.class);
+    master.setAmRMClient(mockClient);
+    master.setNumTotalContainers(targetContainers);
+    Mockito.doNothing().when(mockClient)
+        .addContainerRequest(Matchers.any(AMRMClient.ContainerRequest.class));
+
+    ApplicationMaster.RMCallbackHandler handler = master.getRMCallbackHandler();
+
+    List<Container> containers = new ArrayList<>(1);
+    ContainerId id1 = BuilderUtils.newContainerId(1, 1, 1, 1);
+    containers.add(generateContainer(id1));
+
+    master.numRequestedContainers.set(targetContainers);
+
+    // first allocate a single container, everything should be fine
+    handler.onContainersAllocated(containers);
+    Assert.assertEquals("Wrong container allocation count", 1,
+        master.getAllocatedContainers());
+    Mockito.verifyZeroInteractions(mockClient);
+    Assert.assertEquals("Incorrect number of threads launched", 1,
+        master.threadsLaunched);
+
+    // now send 3 extra containers
+    containers.clear();
+    ContainerId id2 = BuilderUtils.newContainerId(1, 1, 1, 2);
+    containers.add(generateContainer(id2));
+    ContainerId id3 = BuilderUtils.newContainerId(1, 1, 1, 3);
+    containers.add(generateContainer(id3));
+    ContainerId id4 = BuilderUtils.newContainerId(1, 1, 1, 4);
+    containers.add(generateContainer(id4));
+    handler.onContainersAllocated(containers);
+    Assert.assertEquals("Wrong final container allocation count", 4,
+        master.getAllocatedContainers());
+
+    Assert.assertEquals("Incorrect number of threads launched", 4,
+        master.threadsLaunched);
+
+    // make sure we handle completion events correctly
+    List<ContainerStatus> status = new ArrayList<>();
+    status.add(generateContainerStatus(id1, ContainerExitStatus.SUCCESS));
+    status.add(generateContainerStatus(id2, ContainerExitStatus.SUCCESS));
+    status.add(generateContainerStatus(id3, ContainerExitStatus.ABORTED));
+    status.add(generateContainerStatus(id4, ContainerExitStatus.ABORTED));
+    handler.onContainersCompleted(status);
+
+    Assert.assertEquals("Unexpected number of completed containers",
+        targetContainers, master.getNumCompletedContainers());
+    Assert.assertTrue("Master didn't finish containers as expected",
+        master.getDone());
+
+    // test for events from containers we know nothing about
+    // these events should be ignored
+    status = new ArrayList<>();
+    ContainerId id5 = BuilderUtils.newContainerId(1, 1, 1, 5);
+    status.add(generateContainerStatus(id5, ContainerExitStatus.ABORTED));
+    Assert.assertEquals("Unexpected number of completed containers",
+        targetContainers, master.getNumCompletedContainers());
+    Assert.assertTrue("Master didn't finish containers as expected",
+        master.getDone());
+    status.add(generateContainerStatus(id5, ContainerExitStatus.SUCCESS));
+    Assert.assertEquals("Unexpected number of completed containers",
+        targetContainers, master.getNumCompletedContainers());
+    Assert.assertTrue("Master didn't finish containers as expected",
+        master.getDone());
+  }
+
+  private Container generateContainer(ContainerId cid) {
+    return Container.newInstance(cid, NodeId.newInstance("host", 5000),
+      "host:80", Resource.newInstance(1024, 1), Priority.newInstance(0), null);
+  }
+
+  private ContainerStatus
+      generateContainerStatus(ContainerId id, int exitStatus) {
+    return ContainerStatus.newInstance(id, ContainerState.COMPLETE, "",
+      exitStatus);
+  }
+
   @Test
   public void testTimelineClientInDSAppMaster() throws Exception {
     ApplicationMaster appMaster = new ApplicationMaster();