You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by bo...@apache.org on 2012/05/30 16:55:00 UTC
svn commit: r1344289 - in
/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/
hadoop-mapreduce-client/hadoop-mapreduce-client-ap...
Author: bobby
Date: Wed May 30 14:54:59 2012
New Revision: 1344289
URL: http://svn.apache.org/viewvc?rev=1344289&view=rev
Log:
svn merge -c 1344283. FIXES: MAPREDUCE-4152. map task left hanging after AM dies trying to connect to RM (Tom Graves via bobby)
Modified:
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1344289&r1=1344288&r2=1344289&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Wed May 30 14:54:59 2012
@@ -211,6 +211,9 @@ Release 0.23.3 - UNRELEASED
MAPREDUCE-3870. Invalid App Metrics
(Bhallamudi Venkata Siva Kamesh via tgraves).
+ MAPREDUCE-4152. map task left hanging after AM dies trying to connect to RM
+ (Tom Graves via bobby)
+
Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1344289&r1=1344288&r2=1344289&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Wed May 30 14:54:59 2012
@@ -253,6 +253,10 @@ public abstract class TaskAttemptImpl im
.addTransition(TaskAttemptState.RUNNING,
TaskAttemptState.FAIL_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_TIMED_OUT, CLEANUP_CONTAINER_TRANSITION)
+ // if container killed by AM shutting down
+ .addTransition(TaskAttemptState.RUNNING,
+ TaskAttemptState.KILLED,
+ TaskAttemptEventType.TA_CONTAINER_CLEANED, new KilledTransition())
// Kill handling
.addTransition(TaskAttemptState.RUNNING,
TaskAttemptState.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL,
@@ -272,6 +276,10 @@ public abstract class TaskAttemptImpl im
.addTransition(TaskAttemptState.COMMIT_PENDING,
TaskAttemptState.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL,
CLEANUP_CONTAINER_TRANSITION)
+ // if container killed by AM shutting down
+ .addTransition(TaskAttemptState.COMMIT_PENDING,
+ TaskAttemptState.KILLED,
+ TaskAttemptEventType.TA_CONTAINER_CLEANED, new KilledTransition())
.addTransition(TaskAttemptState.COMMIT_PENDING,
TaskAttemptState.FAIL_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION)
@@ -363,6 +371,7 @@ public abstract class TaskAttemptImpl im
TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_DONE,
TaskAttemptEventType.TA_FAILMSG,
+ TaskAttemptEventType.TA_CONTAINER_CLEANED,
// Container launch events can arrive late
TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED))
@@ -384,6 +393,7 @@ public abstract class TaskAttemptImpl im
TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_DONE,
TaskAttemptEventType.TA_FAILMSG,
+ TaskAttemptEventType.TA_CONTAINER_CLEANED,
// Container launch events can arrive late
TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED))
@@ -402,6 +412,7 @@ public abstract class TaskAttemptImpl im
TaskAttemptState.SUCCEEDED,
EnumSet.of(TaskAttemptEventType.TA_KILL,
TaskAttemptEventType.TA_FAILMSG,
+ TaskAttemptEventType.TA_CONTAINER_CLEANED,
TaskAttemptEventType.TA_CONTAINER_COMPLETED))
// Transitions from FAILED state
@@ -417,6 +428,7 @@ public abstract class TaskAttemptImpl im
// Container launch events can arrive late
TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
+ TaskAttemptEventType.TA_CONTAINER_CLEANED,
TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_DONE,
TaskAttemptEventType.TA_FAILMSG))
@@ -434,6 +446,7 @@ public abstract class TaskAttemptImpl im
// Container launch events can arrive late
TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
+ TaskAttemptEventType.TA_CONTAINER_CLEANED,
TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_DONE,
TaskAttemptEventType.TA_FAILMSG))
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java?rev=1344289&r1=1344288&r2=1344289&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java Wed May 30 14:54:59 2012
@@ -82,10 +82,12 @@ public class ContainerLauncherImpl exten
new LinkedBlockingQueue<ContainerLauncherEvent>();
YarnRPC rpc;
- private Container getContainer(ContainerId id) {
+ private Container getContainer(ContainerLauncherEvent event) {
+ ContainerId id = event.getContainerID();
Container c = containers.get(id);
if(c == null) {
- c = new Container();
+ c = new Container(event.getTaskAttemptID(), event.getContainerID(),
+ event.getContainerMgrAddress(), event.getContainerToken());
Container old = containers.putIfAbsent(id, c);
if(old != null) {
c = old;
@@ -107,9 +109,19 @@ public class ContainerLauncherImpl exten
private class Container {
private ContainerState state;
+ // store enough information to be able to cleanup the container
+ private TaskAttemptId taskAttemptID;
+ private ContainerId containerID;
+ final private String containerMgrAddress;
+ private ContainerToken containerToken;
- public Container() {
+ public Container(TaskAttemptId taId, ContainerId containerID,
+ String containerMgrAddress, ContainerToken containerToken) {
this.state = ContainerState.PREP;
+ this.taskAttemptID = taId;
+ this.containerMgrAddress = containerMgrAddress;
+ this.containerID = containerID;
+ this.containerToken = containerToken;
}
public synchronized boolean isCompletelyDone() {
@@ -118,7 +130,6 @@ public class ContainerLauncherImpl exten
@SuppressWarnings("unchecked")
public synchronized void launch(ContainerRemoteLaunchEvent event) {
- TaskAttemptId taskAttemptID = event.getTaskAttemptID();
LOG.info("Launching " + taskAttemptID);
if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) {
state = ContainerState.DONE;
@@ -127,15 +138,10 @@ public class ContainerLauncherImpl exten
return;
}
-
- final String containerManagerBindAddr = event.getContainerMgrAddress();
- ContainerId containerID = event.getContainerID();
- ContainerToken containerToken = event.getContainerToken();
-
ContainerManager proxy = null;
try {
- proxy = getCMProxy(containerID, containerManagerBindAddr,
+ proxy = getCMProxy(containerID, containerMgrAddress,
containerToken);
// Construct the actual Container
@@ -181,35 +187,35 @@ public class ContainerLauncherImpl exten
}
@SuppressWarnings("unchecked")
- public synchronized void kill(ContainerLauncherEvent event) {
+ public synchronized void kill() {
+
+ if(isCompletelyDone()) {
+ return;
+ }
if(this.state == ContainerState.PREP) {
this.state = ContainerState.KILLED_BEFORE_LAUNCH;
} else {
- final String containerManagerBindAddr = event.getContainerMgrAddress();
- ContainerId containerID = event.getContainerID();
- ContainerToken containerToken = event.getContainerToken();
- TaskAttemptId taskAttemptID = event.getTaskAttemptID();
LOG.info("KILLING " + taskAttemptID);
ContainerManager proxy = null;
try {
- proxy = getCMProxy(containerID, containerManagerBindAddr,
- containerToken);
+ proxy = getCMProxy(this.containerID, this.containerMgrAddress,
+ this.containerToken);
// kill the remote container if already launched
StopContainerRequest stopRequest = Records
.newRecord(StopContainerRequest.class);
- stopRequest.setContainerId(event.getContainerID());
+ stopRequest.setContainerId(this.containerID);
proxy.stopContainer(stopRequest);
} catch (Throwable t) {
// ignore the cleanup failure
String message = "cleanup failed for container "
- + event.getContainerID() + " : "
+ + this.containerID + " : "
+ StringUtils.stringifyException(t);
context.getEventHandler().handle(
- new TaskAttemptDiagnosticsUpdateEvent(taskAttemptID, message));
+ new TaskAttemptDiagnosticsUpdateEvent(this.taskAttemptID, message));
LOG.warn(message);
} finally {
if (proxy != null) {
@@ -220,10 +226,11 @@ public class ContainerLauncherImpl exten
}
// after killing, send killed event to task attempt
context.getEventHandler().handle(
- new TaskAttemptEvent(event.getTaskAttemptID(),
+ new TaskAttemptEvent(this.taskAttemptID,
TaskAttemptEventType.TA_CONTAINER_CLEANED));
}
}
+
// To track numNodes.
Set<String> allNodes = new HashSet<String>();
@@ -308,7 +315,17 @@ public class ContainerLauncherImpl exten
super.start();
}
+ private void shutdownAllContainers() {
+ for (Container ct : this.containers.values()) {
+ if (ct != null) {
+ ct.kill();
+ }
+ }
+ }
+
public void stop() {
+ // shutdown any containers that might be left running
+ shutdownAllContainers();
eventHandlingThread.interrupt();
launcherPool.shutdownNow();
super.stop();
@@ -364,7 +381,7 @@ public class ContainerLauncherImpl exten
// TODO: Do it only once per NodeManager.
ContainerId containerID = event.getContainerID();
- Container c = getContainer(containerID);
+ Container c = getContainer(event);
switch(event.getType()) {
case CONTAINER_REMOTE_LAUNCH:
@@ -374,7 +391,7 @@ public class ContainerLauncherImpl exten
break;
case CONTAINER_REMOTE_CLEANUP:
- c.kill(event);
+ c.kill();
break;
}
removeContainerIfDone(containerID);
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java?rev=1344289&r1=1344288&r2=1344289&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java Wed May 30 14:54:59 2012
@@ -72,6 +72,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
@@ -450,6 +451,121 @@ public class TestTaskAttempt{
assertFalse(eventHandler.internalError);
}
+ @Test
+ public void testContainerCleanedWhileRunning() throws Exception {
+ ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
+ ApplicationAttemptId appAttemptId =
+ BuilderUtils.newApplicationAttemptId(appId, 0);
+ JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+ TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+ TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+ Path jobFile = mock(Path.class);
+
+ MockEventHandler eventHandler = new MockEventHandler();
+ TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+ when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
+
+ JobConf jobConf = new JobConf();
+ jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+ jobConf.setBoolean("fs.file.impl.disable.cache", true);
+ jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+ jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+
+ TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+ when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
+
+ AppContext appCtx = mock(AppContext.class);
+ ClusterInfo clusterInfo = mock(ClusterInfo.class);
+ Resource resource = mock(Resource.class);
+ when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
+ when(clusterInfo.getMinContainerCapability()).thenReturn(resource);
+ when(resource.getMemory()).thenReturn(1024);
+
+ TaskAttemptImpl taImpl =
+ new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
+ splits, jobConf, taListener,
+ mock(OutputCommitter.class), mock(Token.class), new Credentials(),
+ new SystemClock(), appCtx);
+
+ NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
+ ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
+ Container container = mock(Container.class);
+ when(container.getId()).thenReturn(contId);
+ when(container.getNodeId()).thenReturn(nid);
+ when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_SCHEDULE));
+ taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
+ container, mock(Map.class)));
+ taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
+ assertEquals("Task attempt is not in running state", taImpl.getState(),
+ TaskAttemptState.RUNNING);
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_CONTAINER_CLEANED));
+ assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
+ eventHandler.internalError);
+ }
+
+ @Test
+ public void testContainerCleanedWhileCommitting() throws Exception {
+ ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
+ ApplicationAttemptId appAttemptId =
+ BuilderUtils.newApplicationAttemptId(appId, 0);
+ JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+ TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+ TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+ Path jobFile = mock(Path.class);
+
+ MockEventHandler eventHandler = new MockEventHandler();
+ TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+ when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
+
+ JobConf jobConf = new JobConf();
+ jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+ jobConf.setBoolean("fs.file.impl.disable.cache", true);
+ jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+ jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+
+ TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+ when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
+
+ AppContext appCtx = mock(AppContext.class);
+ ClusterInfo clusterInfo = mock(ClusterInfo.class);
+ Resource resource = mock(Resource.class);
+ when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
+ when(clusterInfo.getMinContainerCapability()).thenReturn(resource);
+ when(resource.getMemory()).thenReturn(1024);
+
+ TaskAttemptImpl taImpl =
+ new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
+ splits, jobConf, taListener,
+ mock(OutputCommitter.class), mock(Token.class), new Credentials(),
+ new SystemClock(), appCtx);
+
+ NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
+ ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
+ Container container = mock(Container.class);
+ when(container.getId()).thenReturn(contId);
+ when(container.getNodeId()).thenReturn(nid);
+ when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_SCHEDULE));
+ taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
+ container, mock(Map.class)));
+ taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_COMMIT_PENDING));
+
+ assertEquals("Task attempt is not in commit pending state", taImpl.getState(),
+ TaskAttemptState.COMMIT_PENDING);
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_CONTAINER_CLEANED));
+ assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
+ eventHandler.internalError);
+ }
+
public static class MockEventHandler implements EventHandler {
public boolean internalError;
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java?rev=1344289&r1=1344288&r2=1344289&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java Wed May 30 14:54:59 2012
@@ -220,4 +220,58 @@ public class TestContainerLauncherImpl {
ut.stop();
}
}
+
+ @Test
+ public void testMyShutdown() throws Exception {
+ LOG.info("in test Shutdown");
+
+ YarnRPC mockRpc = mock(YarnRPC.class);
+ AppContext mockContext = mock(AppContext.class);
+ @SuppressWarnings("rawtypes")
+ EventHandler mockEventHandler = mock(EventHandler.class);
+ when(mockContext.getEventHandler()).thenReturn(mockEventHandler);
+
+ ContainerManager mockCM = mock(ContainerManager.class);
+ when(mockRpc.getProxy(eq(ContainerManager.class),
+ any(InetSocketAddress.class), any(Configuration.class)))
+ .thenReturn(mockCM);
+
+ ContainerLauncherImplUnderTest ut =
+ new ContainerLauncherImplUnderTest(mockContext, mockRpc);
+
+ Configuration conf = new Configuration();
+ ut.init(conf);
+ ut.start();
+ try {
+ ContainerId contId = makeContainerId(0l, 0, 0, 1);
+ TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
+ String cmAddress = "127.0.0.1:8000";
+ StartContainerResponse startResp =
+ recordFactory.newRecordInstance(StartContainerResponse.class);
+ startResp.setServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
+ ShuffleHandler.serializeMetaData(80));
+
+ LOG.info("inserting launch event");
+ ContainerRemoteLaunchEvent mockLaunchEvent =
+ mock(ContainerRemoteLaunchEvent.class);
+ when(mockLaunchEvent.getType())
+ .thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
+ when(mockLaunchEvent.getContainerID())
+ .thenReturn(contId);
+ when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
+ when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
+ when(mockCM.startContainer(any(StartContainerRequest.class))).thenReturn(startResp);
+ ut.handle(mockLaunchEvent);
+
+ ut.waitForPoolToIdle();
+
+ verify(mockCM).startContainer(any(StartContainerRequest.class));
+
+ // skip cleanup and make sure stop kills the container
+
+ } finally {
+ ut.stop();
+ verify(mockCM).stopContainer(any(StopContainerRequest.class));
+}
+ }
}