You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by bo...@apache.org on 2012/05/30 16:55:00 UTC

svn commit: r1344289 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ hadoop-mapreduce-client/hadoop-mapreduce-client-ap...

Author: bobby
Date: Wed May 30 14:54:59 2012
New Revision: 1344289

URL: http://svn.apache.org/viewvc?rev=1344289&view=rev
Log:
svn merge -c 1344283. FIXES: MAPREDUCE-4152. map task left hanging after AM dies trying to connect to RM (Tom Graves via bobby)

Modified:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1344289&r1=1344288&r2=1344289&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Wed May 30 14:54:59 2012
@@ -211,6 +211,9 @@ Release 0.23.3 - UNRELEASED
     MAPREDUCE-3870. Invalid App Metrics 
     (Bhallamudi Venkata Siva Kamesh via tgraves).
 
+    MAPREDUCE-4152. map task left hanging after AM dies trying to connect to RM
+    (Tom Graves via bobby)
+
 Release 0.23.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1344289&r1=1344288&r2=1344289&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Wed May 30 14:54:59 2012
@@ -253,6 +253,10 @@ public abstract class TaskAttemptImpl im
      .addTransition(TaskAttemptState.RUNNING,
          TaskAttemptState.FAIL_CONTAINER_CLEANUP,
          TaskAttemptEventType.TA_TIMED_OUT, CLEANUP_CONTAINER_TRANSITION)
+     // if container killed by AM shutting down
+     .addTransition(TaskAttemptState.RUNNING,
+         TaskAttemptState.KILLED,
+         TaskAttemptEventType.TA_CONTAINER_CLEANED, new KilledTransition())
      // Kill handling
      .addTransition(TaskAttemptState.RUNNING,
          TaskAttemptState.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL,
@@ -272,6 +276,10 @@ public abstract class TaskAttemptImpl im
      .addTransition(TaskAttemptState.COMMIT_PENDING,
          TaskAttemptState.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL,
          CLEANUP_CONTAINER_TRANSITION)
+     // if container killed by AM shutting down
+     .addTransition(TaskAttemptState.COMMIT_PENDING,
+         TaskAttemptState.KILLED,
+         TaskAttemptEventType.TA_CONTAINER_CLEANED, new KilledTransition())
      .addTransition(TaskAttemptState.COMMIT_PENDING,
          TaskAttemptState.FAIL_CONTAINER_CLEANUP,
          TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION)
@@ -363,6 +371,7 @@ public abstract class TaskAttemptImpl im
              TaskAttemptEventType.TA_COMMIT_PENDING,
              TaskAttemptEventType.TA_DONE,
              TaskAttemptEventType.TA_FAILMSG,
+             TaskAttemptEventType.TA_CONTAINER_CLEANED,
              // Container launch events can arrive late
              TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
              TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED))
@@ -384,6 +393,7 @@ public abstract class TaskAttemptImpl im
              TaskAttemptEventType.TA_COMMIT_PENDING,
              TaskAttemptEventType.TA_DONE,
              TaskAttemptEventType.TA_FAILMSG,
+             TaskAttemptEventType.TA_CONTAINER_CLEANED,
              // Container launch events can arrive late
              TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
              TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED))
@@ -402,6 +412,7 @@ public abstract class TaskAttemptImpl im
          TaskAttemptState.SUCCEEDED,
          EnumSet.of(TaskAttemptEventType.TA_KILL,
              TaskAttemptEventType.TA_FAILMSG,
+             TaskAttemptEventType.TA_CONTAINER_CLEANED,
              TaskAttemptEventType.TA_CONTAINER_COMPLETED))
 
      // Transitions from FAILED state
@@ -417,6 +428,7 @@ public abstract class TaskAttemptImpl im
              // Container launch events can arrive late
              TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
              TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
+             TaskAttemptEventType.TA_CONTAINER_CLEANED,
              TaskAttemptEventType.TA_COMMIT_PENDING,
              TaskAttemptEventType.TA_DONE,
              TaskAttemptEventType.TA_FAILMSG))
@@ -434,6 +446,7 @@ public abstract class TaskAttemptImpl im
              // Container launch events can arrive late
              TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
              TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
+             TaskAttemptEventType.TA_CONTAINER_CLEANED,
              TaskAttemptEventType.TA_COMMIT_PENDING,
              TaskAttemptEventType.TA_DONE,
              TaskAttemptEventType.TA_FAILMSG))

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java?rev=1344289&r1=1344288&r2=1344289&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java Wed May 30 14:54:59 2012
@@ -82,10 +82,12 @@ public class ContainerLauncherImpl exten
       new LinkedBlockingQueue<ContainerLauncherEvent>();
   YarnRPC rpc;
 
-  private Container getContainer(ContainerId id) {
+  private Container getContainer(ContainerLauncherEvent event) {
+    ContainerId id = event.getContainerID();
     Container c = containers.get(id);
     if(c == null) {
-      c = new Container();
+      c = new Container(event.getTaskAttemptID(), event.getContainerID(),
+          event.getContainerMgrAddress(), event.getContainerToken());
       Container old = containers.putIfAbsent(id, c);
       if(old != null) {
         c = old;
@@ -107,9 +109,19 @@ public class ContainerLauncherImpl exten
 
   private class Container {
     private ContainerState state;
+    // store enough information to be able to cleanup the container
+    private TaskAttemptId taskAttemptID;
+    private ContainerId containerID;
+    final private String containerMgrAddress;
+    private ContainerToken containerToken;
     
-    public Container() {
+    public Container(TaskAttemptId taId, ContainerId containerID,
+        String containerMgrAddress, ContainerToken containerToken) {
       this.state = ContainerState.PREP;
+      this.taskAttemptID = taId;
+      this.containerMgrAddress = containerMgrAddress;
+      this.containerID = containerID;
+      this.containerToken = containerToken;
     }
     
     public synchronized boolean isCompletelyDone() {
@@ -118,7 +130,6 @@ public class ContainerLauncherImpl exten
     
     @SuppressWarnings("unchecked")
     public synchronized void launch(ContainerRemoteLaunchEvent event) {
-      TaskAttemptId taskAttemptID = event.getTaskAttemptID();
       LOG.info("Launching " + taskAttemptID);
       if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) {
         state = ContainerState.DONE;
@@ -127,15 +138,10 @@ public class ContainerLauncherImpl exten
         return;
       }
       
-
-      final String containerManagerBindAddr = event.getContainerMgrAddress();
-      ContainerId containerID = event.getContainerID();
-      ContainerToken containerToken = event.getContainerToken();
-
       ContainerManager proxy = null;
       try {
 
-        proxy = getCMProxy(containerID, containerManagerBindAddr,
+        proxy = getCMProxy(containerID, containerMgrAddress,
             containerToken);
 
         // Construct the actual Container
@@ -181,35 +187,35 @@ public class ContainerLauncherImpl exten
     }
     
     @SuppressWarnings("unchecked")
-    public synchronized void kill(ContainerLauncherEvent event) {
+    public synchronized void kill() {
+
+      if(isCompletelyDone()) { 
+        return;
+      }
       if(this.state == ContainerState.PREP) {
         this.state = ContainerState.KILLED_BEFORE_LAUNCH;
       } else {
-        final String containerManagerBindAddr = event.getContainerMgrAddress();
-        ContainerId containerID = event.getContainerID();
-        ContainerToken containerToken = event.getContainerToken();
-        TaskAttemptId taskAttemptID = event.getTaskAttemptID();
         LOG.info("KILLING " + taskAttemptID);
 
         ContainerManager proxy = null;
         try {
-          proxy = getCMProxy(containerID, containerManagerBindAddr,
-              containerToken);
+          proxy = getCMProxy(this.containerID, this.containerMgrAddress,
+              this.containerToken);
 
             // kill the remote container if already launched
             StopContainerRequest stopRequest = Records
               .newRecord(StopContainerRequest.class);
-            stopRequest.setContainerId(event.getContainerID());
+            stopRequest.setContainerId(this.containerID);
             proxy.stopContainer(stopRequest);
 
         } catch (Throwable t) {
 
           // ignore the cleanup failure
           String message = "cleanup failed for container "
-            + event.getContainerID() + " : "
+            + this.containerID + " : "
             + StringUtils.stringifyException(t);
           context.getEventHandler().handle(
-            new TaskAttemptDiagnosticsUpdateEvent(taskAttemptID, message));
+            new TaskAttemptDiagnosticsUpdateEvent(this.taskAttemptID, message));
           LOG.warn(message);
         } finally {
           if (proxy != null) {
@@ -220,10 +226,11 @@ public class ContainerLauncherImpl exten
       }
       // after killing, send killed event to task attempt
       context.getEventHandler().handle(
-          new TaskAttemptEvent(event.getTaskAttemptID(),
+          new TaskAttemptEvent(this.taskAttemptID,
               TaskAttemptEventType.TA_CONTAINER_CLEANED));
     }
   }
+
   // To track numNodes.
   Set<String> allNodes = new HashSet<String>();
 
@@ -308,7 +315,17 @@ public class ContainerLauncherImpl exten
     super.start();
   }
 
+  private void shutdownAllContainers() {
+    for (Container ct : this.containers.values()) {
+      if (ct != null) {
+        ct.kill();
+      }
+    }
+  }
+
   public void stop() {
+    // shutdown any containers that might be left running
+    shutdownAllContainers();
     eventHandlingThread.interrupt();
     launcherPool.shutdownNow();
     super.stop();
@@ -364,7 +381,7 @@ public class ContainerLauncherImpl exten
       // TODO: Do it only once per NodeManager.
       ContainerId containerID = event.getContainerID();
 
-      Container c = getContainer(containerID);
+      Container c = getContainer(event);
       switch(event.getType()) {
 
       case CONTAINER_REMOTE_LAUNCH:
@@ -374,7 +391,7 @@ public class ContainerLauncherImpl exten
         break;
 
       case CONTAINER_REMOTE_CLEANUP:
-        c.kill(event);
+        c.kill();
         break;
       }
       removeContainerIfDone(containerID);

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java?rev=1344289&r1=1344288&r2=1344289&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java Wed May 30 14:54:59 2012
@@ -72,6 +72,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
@@ -450,6 +451,121 @@ public class TestTaskAttempt{
     assertFalse(eventHandler.internalError);
   }
   
+  @Test
+  public void testContainerCleanedWhileRunning() throws Exception {
+    ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
+    ApplicationAttemptId appAttemptId =
+      BuilderUtils.newApplicationAttemptId(appId, 0);
+    JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+    TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+    TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+    Path jobFile = mock(Path.class);
+
+    MockEventHandler eventHandler = new MockEventHandler();
+    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+    when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
+
+    JobConf jobConf = new JobConf();
+    jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    jobConf.setBoolean("fs.file.impl.disable.cache", true);
+    jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+    jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+
+    TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+    when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
+
+    AppContext appCtx = mock(AppContext.class);
+    ClusterInfo clusterInfo = mock(ClusterInfo.class);
+    Resource resource = mock(Resource.class);
+    when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
+    when(clusterInfo.getMinContainerCapability()).thenReturn(resource);
+    when(resource.getMemory()).thenReturn(1024);
+
+    TaskAttemptImpl taImpl =
+      new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
+          splits, jobConf, taListener,
+          mock(OutputCommitter.class), mock(Token.class), new Credentials(),
+          new SystemClock(), appCtx);
+
+    NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
+    ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_SCHEDULE));
+    taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
+        container, mock(Map.class)));
+    taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
+    assertEquals("Task attempt is not in running state", taImpl.getState(),
+        TaskAttemptState.RUNNING);
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_CONTAINER_CLEANED));
+    assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
+        eventHandler.internalError);
+  }
+
+  @Test
+  public void testContainerCleanedWhileCommitting() throws Exception {
+    ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
+    ApplicationAttemptId appAttemptId =
+      BuilderUtils.newApplicationAttemptId(appId, 0);
+    JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+    TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+    TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+    Path jobFile = mock(Path.class);
+
+    MockEventHandler eventHandler = new MockEventHandler();
+    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+    when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
+
+    JobConf jobConf = new JobConf();
+    jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    jobConf.setBoolean("fs.file.impl.disable.cache", true);
+    jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+    jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+
+    TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+    when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
+
+    AppContext appCtx = mock(AppContext.class);
+    ClusterInfo clusterInfo = mock(ClusterInfo.class);
+    Resource resource = mock(Resource.class);
+    when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
+    when(clusterInfo.getMinContainerCapability()).thenReturn(resource);
+    when(resource.getMemory()).thenReturn(1024);
+
+    TaskAttemptImpl taImpl =
+      new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
+          splits, jobConf, taListener,
+          mock(OutputCommitter.class), mock(Token.class), new Credentials(),
+          new SystemClock(), appCtx);
+
+    NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
+    ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_SCHEDULE));
+    taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
+        container, mock(Map.class)));
+    taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_COMMIT_PENDING));
+
+    assertEquals("Task attempt is not in commit pending state", taImpl.getState(),
+        TaskAttemptState.COMMIT_PENDING);
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_CONTAINER_CLEANED));
+    assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
+        eventHandler.internalError);
+  }
+
   public static class MockEventHandler implements EventHandler {
     public boolean internalError;
     

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java?rev=1344289&r1=1344288&r2=1344289&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java Wed May 30 14:54:59 2012
@@ -220,4 +220,58 @@ public class TestContainerLauncherImpl {
       ut.stop();
     }
   }
+
+  @Test
+  public void testMyShutdown() throws Exception {
+    LOG.info("in test Shutdown");
+
+    YarnRPC mockRpc = mock(YarnRPC.class);
+    AppContext mockContext = mock(AppContext.class);
+    @SuppressWarnings("rawtypes")
+    EventHandler mockEventHandler = mock(EventHandler.class);
+    when(mockContext.getEventHandler()).thenReturn(mockEventHandler);
+
+    ContainerManager mockCM = mock(ContainerManager.class);
+    when(mockRpc.getProxy(eq(ContainerManager.class),
+        any(InetSocketAddress.class), any(Configuration.class)))
+        .thenReturn(mockCM);
+
+    ContainerLauncherImplUnderTest ut =
+      new ContainerLauncherImplUnderTest(mockContext, mockRpc);
+
+    Configuration conf = new Configuration();
+    ut.init(conf);
+    ut.start();
+    try {
+      ContainerId contId = makeContainerId(0l, 0, 0, 1);
+      TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
+      String cmAddress = "127.0.0.1:8000";
+      StartContainerResponse startResp =
+        recordFactory.newRecordInstance(StartContainerResponse.class);
+      startResp.setServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
+          ShuffleHandler.serializeMetaData(80));
+
+      LOG.info("inserting launch event");
+      ContainerRemoteLaunchEvent mockLaunchEvent =
+        mock(ContainerRemoteLaunchEvent.class);
+      when(mockLaunchEvent.getType())
+        .thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
+      when(mockLaunchEvent.getContainerID())
+        .thenReturn(contId);
+      when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
+      when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
+      when(mockCM.startContainer(any(StartContainerRequest.class))).thenReturn(startResp);
+      ut.handle(mockLaunchEvent);
+
+      ut.waitForPoolToIdle();
+
+      verify(mockCM).startContainer(any(StartContainerRequest.class));
+
+      // skip cleanup and make sure stop kills the container
+
+    } finally {
+      ut.stop();
+      verify(mockCM).stopContainer(any(StopContainerRequest.class));
+}
+  }
 }