You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/05/29 23:18:25 UTC

git commit: TEZ-113. Add unit tests for AMContainer. (sseth)

Updated Branches:
  refs/heads/TEZ-1 2340884c3 -> a74b43673


TEZ-113. Add unit tests for AMContainer. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/a74b4367
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/a74b4367
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/a74b4367

Branch: refs/heads/TEZ-1
Commit: a74b43673dddf2f9d488f70b3bc337577b1c8dfa
Parents: 2340884
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed May 29 14:18:00 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed May 29 14:18:00 2013 -0700

----------------------------------------------------------------------
 .../java/org/apache/tez/dag/app/AppContext.java    |    2 +-
 .../tez/dag/app/TaskAttemptListenerImpTezDag.java  |    4 +-
 .../dag/app/rm/container/AMContainerHelpers.java   |    1 -
 .../tez/dag/app/rm/container/AMContainerImpl.java  |   19 +-
 .../tez/dag/app/rm/container/TestAMContainer.java  | 1008 +++++++++++++++
 5 files changed, 1024 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a74b4367/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
index 041de56..03e1059 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
@@ -53,7 +53,7 @@ public interface AppContext {
 
   long getStartTime();
 
-  CharSequence getUser();
+  String getUser();
 
   DAG getDAG();
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a74b4367/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index d974cd8..7600db4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -504,9 +504,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     return COMPLETION_RESPONSE_NO_WAIT;
   }
   
-  
-  
-  // TODO EVENTUALLY remove all mrv2 ids.
+
   @Override
   public void unregisterTaskAttempt(TezTaskAttemptID attemptId) {
     attemptToContainerIdMap.remove(attemptId);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a74b4367/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
index 6a9ae2e..357eeda 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
@@ -172,7 +172,6 @@ public class AMContainerHelpers {
     Map<String, String> myEnv = new HashMap<String, String>(env.size());
     myEnv.putAll(env);
     myEnv.putAll(vertexEnv);
-    // TODO TEZ-38 MRChildJVM2.setEnv should become a no-op
 
     // Set up the launch command
     List<String> commands = TezEngineChildJVM.getVMCommand(

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a74b4367/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 1089013..51246dd 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -54,8 +54,6 @@ import org.apache.tez.dag.app.rm.AMSchedulerEventContainerCompleted;
 import org.apache.tez.dag.app.rm.AMSchedulerEventDeallocateContainer;
 import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
 import org.apache.tez.dag.app.rm.NMCommunicatorStopRequestEvent;
-import org.apache.tez.dag.history.DAGHistoryEvent;
-import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 
 @SuppressWarnings("rawtypes")
@@ -156,7 +154,7 @@ public class AMContainerImpl implements AMContainer {
         .addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_TIMED_OUT, new TimedOutAtRunningTransition())
         .addTransition(AMContainerState.RUNNING, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, new NodeFailedAtRunningTransition())
         .addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED, EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED), new ErrorAtRunningTransition())
-        
+
         .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_ASSIGN_TA, new AssignTAAtWindDownTransition())
         .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_PULL_TA, new PullTAAfterStopTransition())
         .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, new CompletedAtWindDownTransition())
@@ -168,11 +166,12 @@ public class AMContainerImpl implements AMContainer {
         
         .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_ASSIGN_TA, new AssignTAAtWindDownTransition())
         .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_PULL_TA, new PullTAAfterStopTransition())
+        // TODO This transition is wrong. Should be a noop / error.
         .addTransition(AMContainerState.STOPPING, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, new CompletedAtWindDownTransition())
         .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, new NodeFailedBaseTransition())
         .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, EnumSet.of(AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_STOP_REQUEST, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED, AMContainerEventType.C_TIMED_OUT))
         .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_LAUNCH_REQUEST, new ErrorAtStoppingTransition())
-        
+
         .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, AMContainerEventType.C_ASSIGN_TA, new AssignTAAtCompletedTransition())
         .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, AMContainerEventType.C_PULL_TA, new PullTAAfterStopTransition())
         .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, AMContainerEventType.C_NODE_FAILED, new NodeFailedBaseTransition())
@@ -236,7 +235,11 @@ public class AMContainerImpl implements AMContainer {
   public List<TezTaskAttemptID> getQueuedTaskAttempts() {
     readLock.lock();
     try {
-      return Collections.singletonList(this.pendingAttempt);
+      if (pendingAttempt != null) {
+        return Collections.singletonList(this.pendingAttempt);
+      } else {
+        return Collections.emptyList();
+      }
     } finally {
       readLock.unlock();
     }
@@ -261,6 +264,10 @@ public class AMContainerImpl implements AMContainer {
       readLock.unlock();
     }
   }
+  
+  public boolean isInErrorState() {
+    return inError;
+  }
 
   @Override
   public void handle(AMContainerEvent event) {
@@ -333,6 +340,8 @@ public class AMContainerImpl implements AMContainer {
           container.taskAttemptListener, event.getCredentials(),
           event.shouldProfile(), container.appContext);
 
+      // Registering now, so that in case of delayed NM response, the child 
+      // task is not told to die since the TAL does not know about the container.
       container.registerWithTAListener();
       container.sendStartRequestToNM();
       LOG.info("Sending Launch Request for Container with id: " +

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a74b4367/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
new file mode 100644
index 0000000..574a2c5
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -0,0 +1,1008 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm.container;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.tez.common.TezTaskContext;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.ContainerHeartbeatHandler;
+import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventNodeFailed;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
+import org.apache.tez.dag.app.rm.AMSchedulerEventType;
+import org.apache.tez.dag.app.rm.NMCommunicatorEventType;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.engine.common.security.JobTokenIdentifier;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+public class TestAMContainer {
+
+  
+  @Test
+  // Assign before launch.
+  public void tetSingleSuccessfulTaskFlow() {
+    WrappedContainer wc = new WrappedContainer();
+    
+    wc.verifyState(AMContainerState.ALLOCATED);
+
+    // Launch request.
+    wc.launchContainer();
+    wc.verifyState(AMContainerState.LAUNCHING);
+    // 1 Launch request.
+    wc.verifyCountAndGetOutgoingEvents(1);
+    
+    // Assign task.
+    wc.assignTaskAttempt(wc.taskAttemptID);
+    wc.verifyState(AMContainerState.LAUNCHING);
+    wc.verifyNoOutgoingEvents();
+    assertEquals(wc.taskAttemptID, wc.amContainer.getQueuedTaskAttempts()
+        .get(0));
+    
+    // Container Launched
+    wc.containerLaunched();
+    wc.verifyState(AMContainerState.IDLE);
+    wc.verifyNoOutgoingEvents();
+    assertEquals(wc.taskAttemptID, wc.amContainer.getQueuedTaskAttempts()
+        .get(0));
+    assertNull(wc.amContainer.getRunningTaskAttempt());
+    verify(wc.tal).registerRunningContainer(wc.containerID);
+    verify(wc.chh).register(wc.containerID);
+    
+    // Pull TA
+    AMContainerTask pulledTask = wc.pullTaskToRun();
+    wc.verifyState(AMContainerState.RUNNING);
+    wc.verifyNoOutgoingEvents();
+    assertFalse(pulledTask.shouldDie());
+    assertEquals(wc.tezTaskContext.getTaskAttemptId(), pulledTask.getTask()
+        .getTaskAttemptId());
+    assertEquals(wc.taskAttemptID, wc.amContainer.getRunningTaskAttempt());
+    assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size());
+    
+    wc.taskAttemptSucceeded(wc.taskAttemptID);
+    wc.verifyState(AMContainerState.IDLE);
+    wc.verifyNoOutgoingEvents();
+    assertNull(wc.amContainer.getRunningTaskAttempt());
+    verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID);
+
+    wc.containerCompleted();
+    wc.verifyState(AMContainerState.COMPLETED);
+    // 1 Scheduler completed event.
+    wc.verifyCountAndGetOutgoingEvents(1);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID);
+    verify(wc.chh).unregister(wc.containerID);
+
+    assertEquals(1, wc.amContainer.getCompletedTaskAttempts().size());
+    assertFalse(wc.amContainer.isInErrorState());
+  }
+  
+  @Test
+  // Assign after launch.
+  public void testSingleSuccessfulTaskFlow2() {
+    WrappedContainer wc = new WrappedContainer();
+    
+    wc.verifyState(AMContainerState.ALLOCATED);
+
+    // Launch request.
+    wc.launchContainer();
+    wc.verifyState(AMContainerState.LAUNCHING);
+    // 1 Launch request.
+    wc.verifyCountAndGetOutgoingEvents(1);
+    
+    // Container Launched
+    wc.containerLaunched();
+    wc.verifyState(AMContainerState.IDLE);
+    wc.verifyNoOutgoingEvents();
+    assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size());
+    verify(wc.tal).registerRunningContainer(wc.containerID);
+    verify(wc.chh).register(wc.containerID);
+    
+    // Assign task.
+    wc.assignTaskAttempt(wc.taskAttemptID);
+    wc.verifyState(AMContainerState.IDLE);
+    wc.verifyNoOutgoingEvents();
+    assertEquals(wc.taskAttemptID, wc.amContainer.getQueuedTaskAttempts()
+        .get(0));
+    assertNull(wc.amContainer.getRunningTaskAttempt());
+
+    // Pull TA
+    AMContainerTask pulledTask = wc.pullTaskToRun();
+    wc.verifyState(AMContainerState.RUNNING);
+    wc.verifyNoOutgoingEvents();
+    assertFalse(pulledTask.shouldDie());
+    assertEquals(wc.tezTaskContext.getTaskAttemptId(), pulledTask.getTask()
+        .getTaskAttemptId());
+    assertEquals(wc.taskAttemptID, wc.amContainer.getRunningTaskAttempt());
+    assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size());
+
+    wc.taskAttemptSucceeded(wc.taskAttemptID);
+    wc.verifyState(AMContainerState.IDLE);
+    wc.verifyNoOutgoingEvents();
+    assertNull(wc.amContainer.getRunningTaskAttempt());
+    verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID);
+
+    wc.containerCompleted();
+    wc.verifyState(AMContainerState.COMPLETED);
+    // 1 Scheduler completed event.
+    wc.verifyCountAndGetOutgoingEvents(1);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID);
+    verify(wc.chh).unregister(wc.containerID);
+    
+    assertEquals(1, wc.amContainer.getCompletedTaskAttempts().size());
+    assertFalse(wc.amContainer.isInErrorState());
+  }
+  
+  @Test
+  public void testSingleSuccessfulTaskFlowStopRequest() {
+    WrappedContainer wc = new WrappedContainer();
+
+    wc.verifyState(AMContainerState.ALLOCATED);
+
+    wc.launchContainer();
+    wc.assignTaskAttempt(wc.taskAttemptID);
+    wc.containerLaunched();
+    wc.pullTaskToRun();
+    wc.taskAttemptSucceeded(wc.taskAttemptID);
+
+    wc.stopRequest();
+    wc.verifyState(AMContainerState.STOP_REQUESTED);
+    // Event to NM to stop the container.
+    wc.verifyCountAndGetOutgoingEvents(1);
+    assertTrue(wc.verifyCountAndGetOutgoingEvents(1).get(0).getType() ==
+        NMCommunicatorEventType.CONTAINER_STOP_REQUEST);
+    
+    wc.nmStopSent();
+    wc.verifyState(AMContainerState.STOPPING);
+    wc.verifyNoOutgoingEvents();
+
+    wc.containerCompleted();
+    wc.verifyState(AMContainerState.COMPLETED);
+    // 1 Scheduler completed event.
+    wc.verifyCountAndGetOutgoingEvents(1);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID);
+    verify(wc.chh).unregister(wc.containerID);
+
+    assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size());
+    assertNull(wc.amContainer.getRunningTaskAttempt());
+    assertEquals(1, wc.amContainer.getCompletedTaskAttempts().size());
+    assertFalse(wc.amContainer.isInErrorState());
+  }
+  
+  @Test
+  public void testSingleSuccessfulTaskFlowFailedNMStopRequest() {
+    WrappedContainer wc = new WrappedContainer();
+
+    wc.verifyState(AMContainerState.ALLOCATED);
+
+    wc.launchContainer();
+    wc.assignTaskAttempt(wc.taskAttemptID);
+    wc.containerLaunched();
+    wc.pullTaskToRun();
+    wc.taskAttemptSucceeded(wc.taskAttemptID);
+
+    wc.stopRequest();
+    wc.verifyState(AMContainerState.STOP_REQUESTED);
+    // Event to NM to stop the container.
+    wc.verifyCountAndGetOutgoingEvents(1);
+    assertTrue(wc.verifyCountAndGetOutgoingEvents(1).get(0).getType() ==
+        NMCommunicatorEventType.CONTAINER_STOP_REQUEST);
+    
+    wc.nmStopFailed();
+    wc.verifyState(AMContainerState.STOPPING);
+    // Event to ask a RM container release.
+    wc.verifyCountAndGetOutgoingEvents(1);
+    assertTrue(wc.verifyCountAndGetOutgoingEvents(1).get(0).getType() ==
+        AMSchedulerEventType.S_CONTAINER_DEALLOCATE);
+    
+    wc.containerCompleted();
+    wc.verifyState(AMContainerState.COMPLETED);
+    // 1 Scheduler completed event.
+    wc.verifyCountAndGetOutgoingEvents(1);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID);
+    verify(wc.chh).unregister(wc.containerID);
+
+    assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size());
+    assertNull(wc.amContainer.getRunningTaskAttempt());
+    assertEquals(1, wc.amContainer.getCompletedTaskAttempts().size());
+    assertFalse(wc.amContainer.isInErrorState());
+  }
+  
+  @SuppressWarnings("rawtypes")
+  @Test
+  public void testMultipleAllocationsAtIdle() {
+    WrappedContainer wc = new WrappedContainer();
+    List<Event> outgoingEvents;
+    
+    wc.launchContainer();
+    wc.containerLaunched();
+    wc.assignTaskAttempt(wc.taskAttemptID);
+    wc.verifyState(AMContainerState.IDLE);
+    
+    TezTaskAttemptID taID2 = new TezTaskAttemptID(wc.taskID, 2);
+    wc.assignTaskAttempt(taID2);
+    
+    wc.verifyState(AMContainerState.STOP_REQUESTED);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID);
+    verify(wc.chh).unregister(wc.containerID);
+    // 1 for NM stop request. 2 TERMINATING to TaskAttempt.
+    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3);
+    verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+        NMCommunicatorEventType.CONTAINER_STOP_REQUEST,
+        TaskAttemptEventType.TA_CONTAINER_TERMINATING,
+        TaskAttemptEventType.TA_CONTAINER_TERMINATING);
+    assertTrue(wc.amContainer.isInErrorState());
+    
+    wc.nmStopSent();
+    wc.containerCompleted();
+    // 1 Inform scheduler. 2 TERMINATED to TaskAttempt.
+    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3);
+    verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+        TaskAttemptEventType.TA_CONTAINER_TERMINATED,
+        TaskAttemptEventType.TA_CONTAINER_TERMINATED,
+        AMSchedulerEventType.S_CONTAINER_COMPLETED);
+    
+    assertNull(wc.amContainer.getRunningTaskAttempt());
+//    assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size()); // TODO. Set/Unset properly.
+//    assertEquals(2, wc.amContainer.getCompletedTaskAttempts().size()); // TODO. Set/Unset properly.
+  }
+  
+  @SuppressWarnings("rawtypes")
+  @Test
+  public void testAllocationAtRunning() {
+    WrappedContainer wc = new WrappedContainer();
+    List<Event> outgoingEvents;
+    
+    wc.launchContainer();
+    wc.containerLaunched();
+    wc.assignTaskAttempt(wc.taskAttemptID);
+    wc.pullTaskToRun();
+    wc.verifyState(AMContainerState.RUNNING);
+    
+    TezTaskAttemptID taID2 = new TezTaskAttemptID(wc.taskID, 2);
+    wc.assignTaskAttempt(taID2);
+    
+    wc.verifyState(AMContainerState.STOP_REQUESTED);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID);
+    verify(wc.chh).unregister(wc.containerID);
+    // 1 for NM stop request. 2 TERMINATING to TaskAttempt.
+    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3);
+    verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+        NMCommunicatorEventType.CONTAINER_STOP_REQUEST,
+        TaskAttemptEventType.TA_CONTAINER_TERMINATING,
+        TaskAttemptEventType.TA_CONTAINER_TERMINATING);
+    assertTrue(wc.amContainer.isInErrorState());
+    
+    wc.nmStopSent();
+    wc.containerCompleted();
+    // 1 Inform scheduler. 2 TERMINATED to TaskAttempt.
+    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3);
+    verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+        TaskAttemptEventType.TA_CONTAINER_TERMINATED,
+        TaskAttemptEventType.TA_CONTAINER_TERMINATED,
+        AMSchedulerEventType.S_CONTAINER_COMPLETED);
+    
+//  assertNull(wc.amContainer.getRunningTaskAttempt()); // TODO Set/Unset properly.
+//  assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size()); // TODO. Set/Unset properly.
+//  assertEquals(2, wc.amContainer.getCompletedTaskAttempts().size()); // TODO. Set/Unset properly.
+  }
+  
+  @SuppressWarnings("rawtypes")
+  @Test
+  public void testMultipleAllocationsAtLaunching() {
+    WrappedContainer wc = new WrappedContainer();
+    List<Event> outgoingEvents;
+    
+    wc.launchContainer();
+    wc.assignTaskAttempt(wc.taskAttemptID);
+    wc.pullTaskToRun();
+    wc.verifyState(AMContainerState.LAUNCHING);
+    
+    TezTaskAttemptID taID2 = new TezTaskAttemptID(wc.taskID, 2);
+    wc.assignTaskAttempt(taID2);
+    
+    wc.verifyState(AMContainerState.STOP_REQUESTED);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID);
+    verify(wc.chh).unregister(wc.containerID);
+    // 1 for NM stop request. 2 TERMINATING to TaskAttempt.
+    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3);
+    verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+        NMCommunicatorEventType.CONTAINER_STOP_REQUEST,
+        TaskAttemptEventType.TA_CONTAINER_TERMINATING,
+        TaskAttemptEventType.TA_CONTAINER_TERMINATING);
+    assertTrue(wc.amContainer.isInErrorState());
+    
+    wc.nmStopSent();
+    wc.containerCompleted();
+    // 1 Inform scheduler. 2 TERMINATED to TaskAttempt.
+    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3);
+    verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+        TaskAttemptEventType.TA_CONTAINER_TERMINATED,
+        TaskAttemptEventType.TA_CONTAINER_TERMINATED,
+        AMSchedulerEventType.S_CONTAINER_COMPLETED);
+    
+//  assertNull(wc.amContainer.getRunningTaskAttempt()); // TODO Set/Unset properly.
+//  assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size()); // TODO. Set/Unset properly.
+//  assertEquals(2, wc.amContainer.getCompletedTaskAttempts().size()); // TODO. Set/Unset properly.
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Test
+  public void testContainerTimedOutAtRunning() {
+    WrappedContainer wc = new WrappedContainer();
+    List<Event> outgoingEvents;
+    
+    wc.launchContainer();
+    wc.containerLaunched();
+    wc.assignTaskAttempt(wc.taskAttemptID);
+    wc.pullTaskToRun();
+    wc.verifyState(AMContainerState.RUNNING);
+    
+    wc.containerTimedOut();
+    wc.verifyState(AMContainerState.STOP_REQUESTED);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID);
+    verify(wc.chh).unregister(wc.containerID);
+    // 1 to TA, 1 for RM de-allocate.
+    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
+    verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+        TaskAttemptEventType.TA_CONTAINER_TERMINATING,
+        NMCommunicatorEventType.CONTAINER_STOP_REQUEST);
+    // TODO Should this be an RM DE-ALLOCATE instead ?
+    
+    wc.containerCompleted();
+    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
+    verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+        TaskAttemptEventType.TA_CONTAINER_TERMINATED,
+        AMSchedulerEventType.S_CONTAINER_COMPLETED);
+    
+    assertFalse(wc.amContainer.isInErrorState());
+    
+//  assertNull(wc.amContainer.getRunningTaskAttempt()); // TODO Set/Unset properly.
+//  assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size()); // TODO. Set/Unset properly.
+//  assertEquals(1, wc.amContainer.getCompletedTaskAttempts().size()); // TODO. Set/Unset properly.
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Test
+  public void testLaunchFailure() {
+    WrappedContainer wc = new WrappedContainer();
+    List<Event> outgoingEvents;
+
+    wc.launchContainer();
+    wc.assignTaskAttempt(wc.taskAttemptID);
+    wc.verifyState(AMContainerState.LAUNCHING);
+    wc.launchFailed();
+    wc.verifyState(AMContainerState.STOPPING);
+    verify(wc.tal).registerRunningContainer(wc.containerID);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID);
+
+    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
+    verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+        TaskAttemptEventType.TA_CONTAINER_TERMINATING,
+        AMSchedulerEventType.S_CONTAINER_DEALLOCATE);
+
+    wc.containerCompleted();
+    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
+    verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+        TaskAttemptEventType.TA_CONTAINER_TERMINATED,
+        AMSchedulerEventType.S_CONTAINER_COMPLETED);
+
+    // Valid transition. Container complete, but not with an error.
+    assertFalse(wc.amContainer.isInErrorState());
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Test
+  public void testContainerCompletedAtAllocated() {
+    WrappedContainer wc = new WrappedContainer();
+    List<Event> outgoingEvents;
+    wc.verifyState(AMContainerState.ALLOCATED);
+
+    wc.containerCompleted();
+    wc.verifyState(AMContainerState.COMPLETED);
+    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
+    verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+        AMSchedulerEventType.S_CONTAINER_COMPLETED);
+
+    assertFalse(wc.amContainer.isInErrorState());
+  }
+
+  @Ignore
+  @SuppressWarnings("rawtypes")
+  @Test
+  // Verify that incoming NM launched events to COMPLETED containers are
+  // handled.
+  public void testContainerCompletedAtLaunching() {
+    WrappedContainer wc = new WrappedContainer();
+    List<Event> outgoingEvents;
+
+    wc.launchContainer();
+    
+    
+    wc.assignTaskAttempt(wc.taskAttemptID);
+
+    wc.containerCompleted();
+    wc.verifyState(AMContainerState.COMPLETED);
+    verify(wc.tal).registerRunningContainer(wc.containerID);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID);
+
+    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
+    verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+        AMSchedulerEventType.S_CONTAINER_COMPLETED,
+        TaskAttemptEventType.TA_CONTAINER_TERMINATED);
+    // TODO Failing because of an extra diagnostic event.
+    
+    assertFalse(wc.amContainer.isInErrorState());
+    
+    // Container launched generated by NM call.
+    wc.containerLaunched();
+    wc.verifyNoOutgoingEvents();
+    
+    assertFalse(wc.amContainer.isInErrorState());
+  }
+
+  @Ignore
+  @SuppressWarnings("rawtypes")
+  @Test
+  public void testContainerCompletedAtIdle() {
+    WrappedContainer wc = new WrappedContainer();
+    List<Event> outgoingEvents;
+
+    wc.launchContainer();
+    
+    wc.assignTaskAttempt(wc.taskAttemptID);
+    wc.containerLaunched();
+    wc.verifyState(AMContainerState.IDLE);
+
+    wc.containerCompleted();
+    wc.verifyState(AMContainerState.COMPLETED);
+    verify(wc.tal).registerRunningContainer(wc.containerID);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID);
+    verify(wc.chh).register(wc.containerID);
+    verify(wc.chh).unregister(wc.containerID);
+
+    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
+    verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+        AMSchedulerEventType.S_CONTAINER_COMPLETED,
+        TaskAttemptEventType.TA_CONTAINER_TERMINATED);
+    // TODO Failing because of two extra diagnostic event.
+    
+    assertFalse(wc.amContainer.isInErrorState());
+    
+    // Pending pull request. (Ideally, container should be dead at this point
+    // and this event should not be generated. Network timeout on NM-RM heartbeat
+    // can cause it to be genreated)
+    wc.pullTaskToRun();
+    wc.verifyNoOutgoingEvents();
+    
+    assertFalse(wc.amContainer.isInErrorState());
+  }
+  
+  @Ignore
+  @SuppressWarnings("rawtypes")
+  @Test
+  public void testContainerCompletedAtRunning() {
+    WrappedContainer wc = new WrappedContainer();
+    List<Event> outgoingEvents;
+
+    wc.launchContainer();
+    
+    wc.assignTaskAttempt(wc.taskAttemptID);
+    wc.containerLaunched();
+    wc.pullTaskToRun();
+    wc.verifyState(AMContainerState.RUNNING);
+
+    wc.containerCompleted();
+    wc.verifyState(AMContainerState.COMPLETED);
+    verify(wc.tal).registerRunningContainer(wc.containerID);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID);
+    verify(wc.chh).register(wc.containerID);
+    verify(wc.chh).unregister(wc.containerID);
+
+    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
+    verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+        AMSchedulerEventType.S_CONTAINER_COMPLETED,
+        TaskAttemptEventType.TA_CONTAINER_TERMINATED);
+    // TODO Failing because of two extra diagnostic event.
+    
+    assertFalse(wc.amContainer.isInErrorState());
+    
+    // Pending task complete. (Ideally, container should be dead at this point
+    // and this event should not be generated. Network timeout on NM-RM heartbeat
+    // can cause it to be genreated)
+    wc.taskAttemptSucceeded(wc.taskAttemptID);
+    wc.verifyNoOutgoingEvents();
+    
+    assertFalse(wc.amContainer.isInErrorState());
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Test
+  public void testTaskAssignedToCompletedContainer() {
+    WrappedContainer wc = new WrappedContainer();
+    List<Event> outgoingEvents;
+    
+    wc.launchContainer();
+    wc.containerLaunched();
+    wc.assignTaskAttempt(wc.taskAttemptID);
+    wc.pullTaskToRun();
+    wc.taskAttemptSucceeded(wc.taskAttemptID);
+    
+    wc.containerCompleted();
+    wc.verifyState(AMContainerState.COMPLETED);
+    
+    TezTaskAttemptID taID2 = new TezTaskAttemptID(wc.taskID, 2);
+    
+    wc.assignTaskAttempt(taID2);
+    
+    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
+    verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+        TaskAttemptEventType.TA_CONTAINER_TERMINATED);
+    TaskAttemptEventContainerTerminated ctEvent = 
+        (TaskAttemptEventContainerTerminated) outgoingEvents.get(0);
+    assertEquals(taID2, ctEvent.getTaskAttemptID());
+  
+    // Allocation to a completed Container is considered an error.
+    // TODO Is this valid ?
+    assertTrue(wc.amContainer.isInErrorState());
+  }
+
+  @Test
+  public void testTaskPullAtLaunching() {
+    WrappedContainer wc = new WrappedContainer();
+    
+    wc.launchContainer();
+    AMContainerTask pulledTask = wc.pullTaskToRun();
+    wc.verifyState(AMContainerState.LAUNCHING);
+    wc.verifyNoOutgoingEvents();
+    assertFalse(pulledTask.shouldDie());
+    assertNull(pulledTask.getTask());
+  }
+  
+  @SuppressWarnings("rawtypes")
+  @Test
+  public void testNodeFailedAtIdle() {
+    WrappedContainer wc = new WrappedContainer();
+    List<Event> outgoingEvents;
+    
+    wc.launchContainer();
+    wc.containerLaunched();
+    wc.assignTaskAttempt(wc.taskAttemptID);
+    wc.verifyState(AMContainerState.IDLE);
+    
+    wc.nodeFailed();
+    // Expecting a complete event from the RM
+    wc.verifyState(AMContainerState.STOPPING);
+    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3);
+    verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+        TaskAttemptEventType.TA_NODE_FAILED,
+        TaskAttemptEventType.TA_CONTAINER_TERMINATING,
+        AMSchedulerEventType.S_CONTAINER_DEALLOCATE);
+    
+    for (Event event : outgoingEvents) {
+      if (event.getType() == TaskAttemptEventType.TA_NODE_FAILED) {
+        TaskAttemptEventNodeFailed nfEvent = (TaskAttemptEventNodeFailed) event;
+        assertEquals("nodeFailed", nfEvent.getDiagnosticInfo());
+      }
+    }
+
+    wc.containerCompleted();
+    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
+    verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+        TaskAttemptEventType.TA_CONTAINER_TERMINATED,
+        AMSchedulerEventType.S_CONTAINER_COMPLETED);
+
+    assertFalse(wc.amContainer.isInErrorState());
+  }
+  
+  @SuppressWarnings("rawtypes")
+  @Test
+  public void testNodeFailedAtIdleMultipleAttempts() {
+    WrappedContainer wc = new WrappedContainer();
+    List<Event> outgoingEvents;
+
+    wc.launchContainer();
+    wc.containerLaunched();
+    wc.assignTaskAttempt(wc.taskAttemptID);
+    wc.pullTaskToRun();
+    wc.taskAttemptSucceeded(wc.taskAttemptID);
+    wc.verifyState(AMContainerState.IDLE);
+
+    TezTaskAttemptID taID2 = new TezTaskAttemptID(wc.taskID, 2);
+    wc.assignTaskAttempt(taID2);
+    wc.pullTaskToRun();
+    wc.taskAttemptSucceeded(taID2);
+    wc.verifyState(AMContainerState.IDLE);
+
+    wc.nodeFailed();
+    // Expecting a complete event from the RM
+    wc.verifyState(AMContainerState.STOPPING);
+    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3);
+    verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+        TaskAttemptEventType.TA_NODE_FAILED,
+        TaskAttemptEventType.TA_NODE_FAILED,
+        AMSchedulerEventType.S_CONTAINER_DEALLOCATE);
+
+    for (Event event : outgoingEvents) {
+      if (event.getType() == TaskAttemptEventType.TA_NODE_FAILED) {
+        TaskAttemptEventNodeFailed nfEvent = (TaskAttemptEventNodeFailed) event;
+        assertEquals("nodeFailed", nfEvent.getDiagnosticInfo());
+      }
+    }
+
+    assertFalse(wc.amContainer.isInErrorState());
+
+    wc.containerCompleted();
+    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
+    verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+        AMSchedulerEventType.S_CONTAINER_COMPLETED);
+    
+    assertNull(wc.amContainer.getRunningTaskAttempt());
+    assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size());
+    assertEquals(2, wc.amContainer.getCompletedTaskAttempts().size());
+  }
+  
+  @SuppressWarnings("rawtypes")
+  @Test
+  public void testNodeFailedAtRunningMultipleAttempts() {
+    WrappedContainer wc = new WrappedContainer();
+    List<Event> outgoingEvents;
+    
+    wc.launchContainer();
+    wc.containerLaunched();
+    wc.assignTaskAttempt(wc.taskAttemptID);
+    wc.pullTaskToRun();
+    wc.taskAttemptSucceeded(wc.taskAttemptID);
+    
+    TezTaskAttemptID taID2 = new TezTaskAttemptID(wc.taskID, 2);
+    wc.assignTaskAttempt(taID2);
+    wc.pullTaskToRun();
+    wc.verifyState(AMContainerState.RUNNING);
+    
+    wc.nodeFailed();
+    // Expecting a complete event from the RM
+    wc.verifyState(AMContainerState.STOPPING);
+    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(4);
+    verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+        TaskAttemptEventType.TA_NODE_FAILED,
+        TaskAttemptEventType.TA_NODE_FAILED,
+        TaskAttemptEventType.TA_CONTAINER_TERMINATING,
+        AMSchedulerEventType.S_CONTAINER_DEALLOCATE);
+    
+    for (Event event : outgoingEvents) {
+      if (event.getType() == TaskAttemptEventType.TA_NODE_FAILED) {
+        TaskAttemptEventNodeFailed nfEvent = (TaskAttemptEventNodeFailed) event;
+        assertEquals("nodeFailed", nfEvent.getDiagnosticInfo());
+      }
+    }
+    
+    wc.containerCompleted();
+    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
+    verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+        TaskAttemptEventType.TA_CONTAINER_TERMINATED,
+        AMSchedulerEventType.S_CONTAINER_COMPLETED);
+
+    assertFalse(wc.amContainer.isInErrorState());
+//    assertNull(wc.amContainer.getRunningTaskAttempt()); // TODO. Set/Unset properly.
+//    assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size()); // TODO. Set/Unset properly.
+//    assertEquals(2, wc.amContainer.getCompletedTaskAttempts().size()); // TODO. Set/Unset properly.
+  }
+  
+  @SuppressWarnings("rawtypes")
+  @Test
+  public void testNodeFailedAtCompletedMultipleSuccessfulTAs() {
+    WrappedContainer wc = new WrappedContainer();
+    List<Event> outgoingEvents;
+    
+    wc.launchContainer();
+    wc.containerLaunched();
+    wc.assignTaskAttempt(wc.taskAttemptID);
+    wc.pullTaskToRun();
+    wc.taskAttemptSucceeded(wc.taskAttemptID);
+    
+    TezTaskAttemptID taID2 = new TezTaskAttemptID(wc.taskID, 2);
+    wc.assignTaskAttempt(taID2);
+    wc.pullTaskToRun();
+    wc.taskAttemptSucceeded(taID2);
+    wc.stopRequest();
+    wc.nmStopSent();
+    wc.containerCompleted();
+    wc.verifyState(AMContainerState.COMPLETED);
+    
+    wc.nodeFailed();
+    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
+    verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+        TaskAttemptEventType.TA_NODE_FAILED,
+        TaskAttemptEventType.TA_NODE_FAILED);
+    
+    assertNull(wc.amContainer.getRunningTaskAttempt());
+    assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size());
+    assertEquals(2, wc.amContainer.getCompletedTaskAttempts().size());
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Test
+  public void testDuplicateCompletedEvents() {
+    WrappedContainer wc = new WrappedContainer();
+    List<Event> outgoingEvents;
+    
+    wc.launchContainer();
+    wc.containerLaunched();
+    wc.assignTaskAttempt(wc.taskAttemptID);
+    wc.pullTaskToRun();
+    wc.taskAttemptSucceeded(wc.taskAttemptID);
+    
+    TezTaskAttemptID taID2 = new TezTaskAttemptID(wc.taskID, 2);
+    wc.assignTaskAttempt(taID2);
+    wc.pullTaskToRun();
+    wc.taskAttemptSucceeded(taID2);
+    wc.stopRequest();
+    wc.nmStopSent();
+    wc.containerCompleted();
+    wc.verifyState(AMContainerState.COMPLETED);
+    
+    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
+    verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+        AMSchedulerEventType.S_CONTAINER_COMPLETED);
+   
+    wc.containerCompleted();
+    wc.verifyNoOutgoingEvents();
+  }
+  
+  
+  // TODO Verify diagnostics in most of the tests.
+  
+  private static class WrappedContainer {
+    
+    long rmIdentifier = 2000;
+    ApplicationId applicationID;
+    ApplicationAttemptId appAttemptID;
+    ContainerId containerID;
+    NodeId nodeID;
+    String nodeHttpAddress;
+    Resource resource;
+    Priority priority;
+    Container container;
+    ContainerHeartbeatHandler chh;
+    TaskAttemptListener tal;
+    
+    @SuppressWarnings("rawtypes")
+    EventHandler eventHandler;
+
+    AppContext appContext;
+
+    TezDAGID dagID;
+    TezVertexID vertexID;
+    TezTaskID taskID;
+    TezTaskAttemptID taskAttemptID;
+    
+    TezTaskContext tezTaskContext;
+
+    Token<JobTokenIdentifier> jobToken;
+    
+    public AMContainerImpl amContainer;
+    
+    @SuppressWarnings("unchecked")
+    public WrappedContainer() {
+      applicationID = BuilderUtils.newApplicationId(rmIdentifier, 1);
+      appAttemptID = BuilderUtils.newApplicationAttemptId(applicationID, 1);
+      containerID = BuilderUtils.newContainerId(appAttemptID, 1);
+      nodeID = BuilderUtils.newNodeId("host", 12500);
+      nodeHttpAddress = "host:12501";
+      resource = BuilderUtils.newResource(1024, 1);
+      priority = BuilderUtils.newPriority(1);
+      container = BuilderUtils.newContainer(containerID, nodeID,
+          nodeHttpAddress, resource, priority, null, rmIdentifier);
+      chh = mock(ContainerHeartbeatHandler.class);
+      
+      InetSocketAddress addr = new InetSocketAddress("localhost", 0);
+      tal = mock(TaskAttemptListener.class);
+      doReturn(addr).when(tal).getAddress();
+      
+      eventHandler = mock(EventHandler.class);
+      
+      appContext = mock(AppContext.class);
+      doReturn(new HashMap<ApplicationAccessType, String>()).when(appContext)
+      .getApplicationACLs();
+      doReturn(eventHandler).when(appContext).getEventHandler();
+
+      dagID = new TezDAGID(applicationID, 1);
+      vertexID = new TezVertexID(dagID, 1);
+      taskID = new TezTaskID(vertexID, 1);
+      taskAttemptID = new TezTaskAttemptID(taskID, 1);
+      
+      tezTaskContext = mock(TezTaskContext.class);
+      doReturn(taskAttemptID).when(tezTaskContext).getTaskAttemptId();
+
+      
+      jobToken = (Token<JobTokenIdentifier>) mock(Token.class);
+      
+      amContainer = new AMContainerImpl(container, chh, tal,
+          appContext);
+    }
+
+    /**
+     * Verifies no additional outgoing events generated by the last incoming
+     * event to the AMContainer.
+     */
+    @SuppressWarnings("unchecked")
+    public void verifyNoOutgoingEvents() {
+      verify(eventHandler, never()).handle(any(Event.class));
+    }
+    
+    /**
+     * Returns a list of outgoing events generated by the last incoming event to
+     * the AMContainer. 
+     * @param invocations number of expected invocations.
+     * 
+     * @return a list of outgoing events from the AMContainer.
+     */
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public List<Event> verifyCountAndGetOutgoingEvents(int invocations) {
+      ArgumentCaptor<Event> args = ArgumentCaptor.forClass(Event.class);
+      verify(eventHandler, times(invocations)).handle(args.capture());
+      return args.getAllValues();
+    }
+
+    public void launchContainer() {
+      reset(eventHandler);
+      amContainer.handle(new AMContainerEventLaunchRequest(containerID, vertexID,
+          jobToken, new Credentials(), false, new TezConfiguration(),
+          new HashMap<String, LocalResource>(), new HashMap<String, String>(),
+          null));
+    }
+    
+    public void assignTaskAttempt(TezTaskAttemptID taID) {
+      reset(eventHandler);
+      amContainer.handle(new AMContainerEventAssignTA(containerID, taID,
+          tezTaskContext)); 
+    }
+    
+    public AMContainerTask pullTaskToRun() {
+      reset(eventHandler);
+      return amContainer.pullTaskContext();
+    }
+
+    public void containerLaunched() {
+      reset(eventHandler);
+      amContainer.handle(new AMContainerEventLaunched(containerID, 3000));
+    }
+
+    public void taskAttemptSucceeded(TezTaskAttemptID taID) {
+      reset(eventHandler);
+      amContainer.handle(new AMContainerEventTASucceeded(containerID, taID));
+    }
+    
+    public void stopRequest() {
+      reset(eventHandler);
+      amContainer.handle(new AMContainerEvent(containerID,
+          AMContainerEventType.C_STOP_REQUEST));
+    }
+    
+    public void nmStopSent() {
+      reset(eventHandler);
+      amContainer.handle(new AMContainerEvent(containerID,
+          AMContainerEventType.C_NM_STOP_SENT));
+    }
+    
+    public void nmStopFailed() {
+      reset(eventHandler);
+      amContainer.handle(new AMContainerEvent(containerID,
+          AMContainerEventType.C_NM_STOP_FAILED));
+    }
+    
+    public void containerCompleted() {
+      reset(eventHandler);
+      ContainerStatus cStatus = ContainerStatus.newInstance(containerID,
+          ContainerState.COMPLETE, "", 100);
+      amContainer.handle(new AMContainerEventCompleted(cStatus));
+    }
+    
+    public void containerTimedOut() {
+      reset(eventHandler);
+      amContainer.handle(new AMContainerEvent(containerID,
+          AMContainerEventType.C_TIMED_OUT));
+    }
+    
+    public void launchFailed() {
+      reset(eventHandler);
+      amContainer.handle(new AMContainerEventLaunchFailed(containerID,
+          "launchFailed"));
+    }
+    
+    public void nodeFailed() {
+      reset(eventHandler);
+      amContainer.handle(new AMContainerEventNodeFailed(containerID,
+          "nodeFailed"));
+    }
+    
+    public void verifyState(AMContainerState state) {
+      assertEquals(
+          "Expected state: " + state + ", but found: " + amContainer.getState(),
+          state, amContainer.getState());
+    }
+  }
+
+  @SuppressWarnings("rawtypes")
+  private void verifyUnOrderedOutgoingEventTypes(List<Event> events,
+      Enum<?>... expectedTypes) {
+
+    List<Enum<?>> expectedTypeList = new LinkedList<Enum<?>>();
+    for (Enum<?> expectedType : expectedTypes) {
+      expectedTypeList.add(expectedType);
+    }
+    List<Event> eventsCopy = new LinkedList<Event>(events);
+
+    Iterator<Enum<?>> expectedTypeIterator = expectedTypeList.iterator();
+    while (expectedTypeIterator.hasNext()) {
+      Enum<?> expectedType = expectedTypeIterator.next();
+      Iterator<Event> iter = eventsCopy.iterator();
+      while (iter.hasNext()) {
+        Event e = iter.next();
+        if (e.getType() == expectedType) {
+          iter.remove();
+          expectedTypeIterator.remove();
+          break;
+        }
+      }
+    }
+    assertTrue("Did not find types : " + expectedTypeList
+        + " in outgoing event list", expectedTypeList.isEmpty());
+    assertTrue("Found unexpected events: " + eventsCopy
+        + " in outgoing event list", eventsCopy.isEmpty());
+  }
+}