You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/05/29 23:18:25 UTC
git commit: TEZ-113. Add unit tests for AMContainer. (sseth)
Updated Branches:
refs/heads/TEZ-1 2340884c3 -> a74b43673
TEZ-113. Add unit tests for AMContainer. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/a74b4367
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/a74b4367
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/a74b4367
Branch: refs/heads/TEZ-1
Commit: a74b43673dddf2f9d488f70b3bc337577b1c8dfa
Parents: 2340884
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed May 29 14:18:00 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed May 29 14:18:00 2013 -0700
----------------------------------------------------------------------
.../java/org/apache/tez/dag/app/AppContext.java | 2 +-
.../tez/dag/app/TaskAttemptListenerImpTezDag.java | 4 +-
.../dag/app/rm/container/AMContainerHelpers.java | 1 -
.../tez/dag/app/rm/container/AMContainerImpl.java | 19 +-
.../tez/dag/app/rm/container/TestAMContainer.java | 1008 +++++++++++++++
5 files changed, 1024 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a74b4367/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
index 041de56..03e1059 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
@@ -53,7 +53,7 @@ public interface AppContext {
long getStartTime();
- CharSequence getUser();
+ String getUser();
DAG getDAG();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a74b4367/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index d974cd8..7600db4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -504,9 +504,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
return COMPLETION_RESPONSE_NO_WAIT;
}
-
-
- // TODO EVENTUALLY remove all mrv2 ids.
+
@Override
public void unregisterTaskAttempt(TezTaskAttemptID attemptId) {
attemptToContainerIdMap.remove(attemptId);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a74b4367/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
index 6a9ae2e..357eeda 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
@@ -172,7 +172,6 @@ public class AMContainerHelpers {
Map<String, String> myEnv = new HashMap<String, String>(env.size());
myEnv.putAll(env);
myEnv.putAll(vertexEnv);
- // TODO TEZ-38 MRChildJVM2.setEnv should become a no-op
// Set up the launch command
List<String> commands = TezEngineChildJVM.getVMCommand(
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a74b4367/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 1089013..51246dd 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -54,8 +54,6 @@ import org.apache.tez.dag.app.rm.AMSchedulerEventContainerCompleted;
import org.apache.tez.dag.app.rm.AMSchedulerEventDeallocateContainer;
import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
import org.apache.tez.dag.app.rm.NMCommunicatorStopRequestEvent;
-import org.apache.tez.dag.history.DAGHistoryEvent;
-import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
import org.apache.tez.dag.records.TezTaskAttemptID;
@SuppressWarnings("rawtypes")
@@ -156,7 +154,7 @@ public class AMContainerImpl implements AMContainer {
.addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_TIMED_OUT, new TimedOutAtRunningTransition())
.addTransition(AMContainerState.RUNNING, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, new NodeFailedAtRunningTransition())
.addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED, EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED), new ErrorAtRunningTransition())
-
+
.addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_ASSIGN_TA, new AssignTAAtWindDownTransition())
.addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_PULL_TA, new PullTAAfterStopTransition())
.addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, new CompletedAtWindDownTransition())
@@ -168,11 +166,12 @@ public class AMContainerImpl implements AMContainer {
.addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_ASSIGN_TA, new AssignTAAtWindDownTransition())
.addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_PULL_TA, new PullTAAfterStopTransition())
+ // TODO This transition is wrong. Should be a noop / error.
.addTransition(AMContainerState.STOPPING, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, new CompletedAtWindDownTransition())
.addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, new NodeFailedBaseTransition())
.addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, EnumSet.of(AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_STOP_REQUEST, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED, AMContainerEventType.C_TIMED_OUT))
.addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_LAUNCH_REQUEST, new ErrorAtStoppingTransition())
-
+
.addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, AMContainerEventType.C_ASSIGN_TA, new AssignTAAtCompletedTransition())
.addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, AMContainerEventType.C_PULL_TA, new PullTAAfterStopTransition())
.addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, AMContainerEventType.C_NODE_FAILED, new NodeFailedBaseTransition())
@@ -236,7 +235,11 @@ public class AMContainerImpl implements AMContainer {
public List<TezTaskAttemptID> getQueuedTaskAttempts() {
readLock.lock();
try {
- return Collections.singletonList(this.pendingAttempt);
+ if (pendingAttempt != null) {
+ return Collections.singletonList(this.pendingAttempt);
+ } else {
+ return Collections.emptyList();
+ }
} finally {
readLock.unlock();
}
@@ -261,6 +264,10 @@ public class AMContainerImpl implements AMContainer {
readLock.unlock();
}
}
+
+ public boolean isInErrorState() {
+ return inError;
+ }
@Override
public void handle(AMContainerEvent event) {
@@ -333,6 +340,8 @@ public class AMContainerImpl implements AMContainer {
container.taskAttemptListener, event.getCredentials(),
event.shouldProfile(), container.appContext);
+ // Registering now, so that in case of delayed NM response, the child
+ // task is not told to die since the TAL does not know about the container.
container.registerWithTAListener();
container.sendStartRequestToNM();
LOG.info("Sending Launch Request for Container with id: " +
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a74b4367/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
new file mode 100644
index 0000000..574a2c5
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -0,0 +1,1008 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm.container;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.tez.common.TezTaskContext;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.ContainerHeartbeatHandler;
+import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventNodeFailed;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
+import org.apache.tez.dag.app.rm.AMSchedulerEventType;
+import org.apache.tez.dag.app.rm.NMCommunicatorEventType;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.engine.common.security.JobTokenIdentifier;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+public class TestAMContainer {
+
+
+ @Test
+ // Assign before launch.
+ public void tetSingleSuccessfulTaskFlow() {
+ WrappedContainer wc = new WrappedContainer();
+
+ wc.verifyState(AMContainerState.ALLOCATED);
+
+ // Launch request.
+ wc.launchContainer();
+ wc.verifyState(AMContainerState.LAUNCHING);
+ // 1 Launch request.
+ wc.verifyCountAndGetOutgoingEvents(1);
+
+ // Assign task.
+ wc.assignTaskAttempt(wc.taskAttemptID);
+ wc.verifyState(AMContainerState.LAUNCHING);
+ wc.verifyNoOutgoingEvents();
+ assertEquals(wc.taskAttemptID, wc.amContainer.getQueuedTaskAttempts()
+ .get(0));
+
+ // Container Launched
+ wc.containerLaunched();
+ wc.verifyState(AMContainerState.IDLE);
+ wc.verifyNoOutgoingEvents();
+ assertEquals(wc.taskAttemptID, wc.amContainer.getQueuedTaskAttempts()
+ .get(0));
+ assertNull(wc.amContainer.getRunningTaskAttempt());
+ verify(wc.tal).registerRunningContainer(wc.containerID);
+ verify(wc.chh).register(wc.containerID);
+
+ // Pull TA
+ AMContainerTask pulledTask = wc.pullTaskToRun();
+ wc.verifyState(AMContainerState.RUNNING);
+ wc.verifyNoOutgoingEvents();
+ assertFalse(pulledTask.shouldDie());
+ assertEquals(wc.tezTaskContext.getTaskAttemptId(), pulledTask.getTask()
+ .getTaskAttemptId());
+ assertEquals(wc.taskAttemptID, wc.amContainer.getRunningTaskAttempt());
+ assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size());
+
+ wc.taskAttemptSucceeded(wc.taskAttemptID);
+ wc.verifyState(AMContainerState.IDLE);
+ wc.verifyNoOutgoingEvents();
+ assertNull(wc.amContainer.getRunningTaskAttempt());
+ verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID);
+
+ wc.containerCompleted();
+ wc.verifyState(AMContainerState.COMPLETED);
+ // 1 Scheduler completed event.
+ wc.verifyCountAndGetOutgoingEvents(1);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID);
+ verify(wc.chh).unregister(wc.containerID);
+
+ assertEquals(1, wc.amContainer.getCompletedTaskAttempts().size());
+ assertFalse(wc.amContainer.isInErrorState());
+ }
+
+ @Test
+ // Assign after launch.
+ public void testSingleSuccessfulTaskFlow2() {
+ WrappedContainer wc = new WrappedContainer();
+
+ wc.verifyState(AMContainerState.ALLOCATED);
+
+ // Launch request.
+ wc.launchContainer();
+ wc.verifyState(AMContainerState.LAUNCHING);
+ // 1 Launch request.
+ wc.verifyCountAndGetOutgoingEvents(1);
+
+ // Container Launched
+ wc.containerLaunched();
+ wc.verifyState(AMContainerState.IDLE);
+ wc.verifyNoOutgoingEvents();
+ assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size());
+ verify(wc.tal).registerRunningContainer(wc.containerID);
+ verify(wc.chh).register(wc.containerID);
+
+ // Assign task.
+ wc.assignTaskAttempt(wc.taskAttemptID);
+ wc.verifyState(AMContainerState.IDLE);
+ wc.verifyNoOutgoingEvents();
+ assertEquals(wc.taskAttemptID, wc.amContainer.getQueuedTaskAttempts()
+ .get(0));
+ assertNull(wc.amContainer.getRunningTaskAttempt());
+
+ // Pull TA
+ AMContainerTask pulledTask = wc.pullTaskToRun();
+ wc.verifyState(AMContainerState.RUNNING);
+ wc.verifyNoOutgoingEvents();
+ assertFalse(pulledTask.shouldDie());
+ assertEquals(wc.tezTaskContext.getTaskAttemptId(), pulledTask.getTask()
+ .getTaskAttemptId());
+ assertEquals(wc.taskAttemptID, wc.amContainer.getRunningTaskAttempt());
+ assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size());
+
+ wc.taskAttemptSucceeded(wc.taskAttemptID);
+ wc.verifyState(AMContainerState.IDLE);
+ wc.verifyNoOutgoingEvents();
+ assertNull(wc.amContainer.getRunningTaskAttempt());
+ verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID);
+
+ wc.containerCompleted();
+ wc.verifyState(AMContainerState.COMPLETED);
+ // 1 Scheduler completed event.
+ wc.verifyCountAndGetOutgoingEvents(1);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID);
+ verify(wc.chh).unregister(wc.containerID);
+
+ assertEquals(1, wc.amContainer.getCompletedTaskAttempts().size());
+ assertFalse(wc.amContainer.isInErrorState());
+ }
+
+ @Test
+ public void testSingleSuccessfulTaskFlowStopRequest() {
+ WrappedContainer wc = new WrappedContainer();
+
+ wc.verifyState(AMContainerState.ALLOCATED);
+
+ wc.launchContainer();
+ wc.assignTaskAttempt(wc.taskAttemptID);
+ wc.containerLaunched();
+ wc.pullTaskToRun();
+ wc.taskAttemptSucceeded(wc.taskAttemptID);
+
+ wc.stopRequest();
+ wc.verifyState(AMContainerState.STOP_REQUESTED);
+ // Event to NM to stop the container.
+ wc.verifyCountAndGetOutgoingEvents(1);
+ assertTrue(wc.verifyCountAndGetOutgoingEvents(1).get(0).getType() ==
+ NMCommunicatorEventType.CONTAINER_STOP_REQUEST);
+
+ wc.nmStopSent();
+ wc.verifyState(AMContainerState.STOPPING);
+ wc.verifyNoOutgoingEvents();
+
+ wc.containerCompleted();
+ wc.verifyState(AMContainerState.COMPLETED);
+ // 1 Scheduler completed event.
+ wc.verifyCountAndGetOutgoingEvents(1);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID);
+ verify(wc.chh).unregister(wc.containerID);
+
+ assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size());
+ assertNull(wc.amContainer.getRunningTaskAttempt());
+ assertEquals(1, wc.amContainer.getCompletedTaskAttempts().size());
+ assertFalse(wc.amContainer.isInErrorState());
+ }
+
+ @Test
+ public void testSingleSuccessfulTaskFlowFailedNMStopRequest() {
+ WrappedContainer wc = new WrappedContainer();
+
+ wc.verifyState(AMContainerState.ALLOCATED);
+
+ wc.launchContainer();
+ wc.assignTaskAttempt(wc.taskAttemptID);
+ wc.containerLaunched();
+ wc.pullTaskToRun();
+ wc.taskAttemptSucceeded(wc.taskAttemptID);
+
+ wc.stopRequest();
+ wc.verifyState(AMContainerState.STOP_REQUESTED);
+ // Event to NM to stop the container.
+ wc.verifyCountAndGetOutgoingEvents(1);
+ assertTrue(wc.verifyCountAndGetOutgoingEvents(1).get(0).getType() ==
+ NMCommunicatorEventType.CONTAINER_STOP_REQUEST);
+
+ wc.nmStopFailed();
+ wc.verifyState(AMContainerState.STOPPING);
+ // Event to ask a RM container release.
+ wc.verifyCountAndGetOutgoingEvents(1);
+ assertTrue(wc.verifyCountAndGetOutgoingEvents(1).get(0).getType() ==
+ AMSchedulerEventType.S_CONTAINER_DEALLOCATE);
+
+ wc.containerCompleted();
+ wc.verifyState(AMContainerState.COMPLETED);
+ // 1 Scheduler completed event.
+ wc.verifyCountAndGetOutgoingEvents(1);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID);
+ verify(wc.chh).unregister(wc.containerID);
+
+ assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size());
+ assertNull(wc.amContainer.getRunningTaskAttempt());
+ assertEquals(1, wc.amContainer.getCompletedTaskAttempts().size());
+ assertFalse(wc.amContainer.isInErrorState());
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testMultipleAllocationsAtIdle() {
+ WrappedContainer wc = new WrappedContainer();
+ List<Event> outgoingEvents;
+
+ wc.launchContainer();
+ wc.containerLaunched();
+ wc.assignTaskAttempt(wc.taskAttemptID);
+ wc.verifyState(AMContainerState.IDLE);
+
+ TezTaskAttemptID taID2 = new TezTaskAttemptID(wc.taskID, 2);
+ wc.assignTaskAttempt(taID2);
+
+ wc.verifyState(AMContainerState.STOP_REQUESTED);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID);
+ verify(wc.chh).unregister(wc.containerID);
+ // 1 for NM stop request. 2 TERMINATING to TaskAttempt.
+ outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3);
+ verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+ NMCommunicatorEventType.CONTAINER_STOP_REQUEST,
+ TaskAttemptEventType.TA_CONTAINER_TERMINATING,
+ TaskAttemptEventType.TA_CONTAINER_TERMINATING);
+ assertTrue(wc.amContainer.isInErrorState());
+
+ wc.nmStopSent();
+ wc.containerCompleted();
+ // 1 Inform scheduler. 2 TERMINATED to TaskAttempt.
+ outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3);
+ verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+ TaskAttemptEventType.TA_CONTAINER_TERMINATED,
+ TaskAttemptEventType.TA_CONTAINER_TERMINATED,
+ AMSchedulerEventType.S_CONTAINER_COMPLETED);
+
+ assertNull(wc.amContainer.getRunningTaskAttempt());
+// assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size()); // TODO. Set/Unset properly.
+// assertEquals(2, wc.amContainer.getCompletedTaskAttempts().size()); // TODO. Set/Unset properly.
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testAllocationAtRunning() {
+ WrappedContainer wc = new WrappedContainer();
+ List<Event> outgoingEvents;
+
+ wc.launchContainer();
+ wc.containerLaunched();
+ wc.assignTaskAttempt(wc.taskAttemptID);
+ wc.pullTaskToRun();
+ wc.verifyState(AMContainerState.RUNNING);
+
+ TezTaskAttemptID taID2 = new TezTaskAttemptID(wc.taskID, 2);
+ wc.assignTaskAttempt(taID2);
+
+ wc.verifyState(AMContainerState.STOP_REQUESTED);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID);
+ verify(wc.chh).unregister(wc.containerID);
+ // 1 for NM stop request. 2 TERMINATING to TaskAttempt.
+ outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3);
+ verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+ NMCommunicatorEventType.CONTAINER_STOP_REQUEST,
+ TaskAttemptEventType.TA_CONTAINER_TERMINATING,
+ TaskAttemptEventType.TA_CONTAINER_TERMINATING);
+ assertTrue(wc.amContainer.isInErrorState());
+
+ wc.nmStopSent();
+ wc.containerCompleted();
+ // 1 Inform scheduler. 2 TERMINATED to TaskAttempt.
+ outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3);
+ verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+ TaskAttemptEventType.TA_CONTAINER_TERMINATED,
+ TaskAttemptEventType.TA_CONTAINER_TERMINATED,
+ AMSchedulerEventType.S_CONTAINER_COMPLETED);
+
+// assertNull(wc.amContainer.getRunningTaskAttempt()); // TODO Set/Unset properly.
+// assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size()); // TODO. Set/Unset properly.
+// assertEquals(2, wc.amContainer.getCompletedTaskAttempts().size()); // TODO. Set/Unset properly.
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testMultipleAllocationsAtLaunching() {
+ WrappedContainer wc = new WrappedContainer();
+ List<Event> outgoingEvents;
+
+ wc.launchContainer();
+ wc.assignTaskAttempt(wc.taskAttemptID);
+ wc.pullTaskToRun();
+ wc.verifyState(AMContainerState.LAUNCHING);
+
+ TezTaskAttemptID taID2 = new TezTaskAttemptID(wc.taskID, 2);
+ wc.assignTaskAttempt(taID2);
+
+ wc.verifyState(AMContainerState.STOP_REQUESTED);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID);
+ verify(wc.chh).unregister(wc.containerID);
+ // 1 for NM stop request. 2 TERMINATING to TaskAttempt.
+ outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3);
+ verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+ NMCommunicatorEventType.CONTAINER_STOP_REQUEST,
+ TaskAttemptEventType.TA_CONTAINER_TERMINATING,
+ TaskAttemptEventType.TA_CONTAINER_TERMINATING);
+ assertTrue(wc.amContainer.isInErrorState());
+
+ wc.nmStopSent();
+ wc.containerCompleted();
+ // 1 Inform scheduler. 2 TERMINATED to TaskAttempt.
+ outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3);
+ verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+ TaskAttemptEventType.TA_CONTAINER_TERMINATED,
+ TaskAttemptEventType.TA_CONTAINER_TERMINATED,
+ AMSchedulerEventType.S_CONTAINER_COMPLETED);
+
+// assertNull(wc.amContainer.getRunningTaskAttempt()); // TODO Set/Unset properly.
+// assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size()); // TODO. Set/Unset properly.
+// assertEquals(2, wc.amContainer.getCompletedTaskAttempts().size()); // TODO. Set/Unset properly.
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testContainerTimedOutAtRunning() {
+ WrappedContainer wc = new WrappedContainer();
+ List<Event> outgoingEvents;
+
+ wc.launchContainer();
+ wc.containerLaunched();
+ wc.assignTaskAttempt(wc.taskAttemptID);
+ wc.pullTaskToRun();
+ wc.verifyState(AMContainerState.RUNNING);
+
+ wc.containerTimedOut();
+ wc.verifyState(AMContainerState.STOP_REQUESTED);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID);
+ verify(wc.chh).unregister(wc.containerID);
+ // 1 to TA, 1 for RM de-allocate.
+ outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
+ verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+ TaskAttemptEventType.TA_CONTAINER_TERMINATING,
+ NMCommunicatorEventType.CONTAINER_STOP_REQUEST);
+ // TODO Should this be an RM DE-ALLOCATE instead ?
+
+ wc.containerCompleted();
+ outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
+ verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+ TaskAttemptEventType.TA_CONTAINER_TERMINATED,
+ AMSchedulerEventType.S_CONTAINER_COMPLETED);
+
+ assertFalse(wc.amContainer.isInErrorState());
+
+// assertNull(wc.amContainer.getRunningTaskAttempt()); // TODO Set/Unset properly.
+// assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size()); // TODO. Set/Unset properly.
+// assertEquals(1, wc.amContainer.getCompletedTaskAttempts().size()); // TODO. Set/Unset properly.
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testLaunchFailure() {
+ WrappedContainer wc = new WrappedContainer();
+ List<Event> outgoingEvents;
+
+ wc.launchContainer();
+ wc.assignTaskAttempt(wc.taskAttemptID);
+ wc.verifyState(AMContainerState.LAUNCHING);
+ wc.launchFailed();
+ wc.verifyState(AMContainerState.STOPPING);
+ verify(wc.tal).registerRunningContainer(wc.containerID);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID);
+
+ outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
+ verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+ TaskAttemptEventType.TA_CONTAINER_TERMINATING,
+ AMSchedulerEventType.S_CONTAINER_DEALLOCATE);
+
+ wc.containerCompleted();
+ outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
+ verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+ TaskAttemptEventType.TA_CONTAINER_TERMINATED,
+ AMSchedulerEventType.S_CONTAINER_COMPLETED);
+
+ // Valid transition. Container complete, but not with an error.
+ assertFalse(wc.amContainer.isInErrorState());
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testContainerCompletedAtAllocated() {
+ WrappedContainer wc = new WrappedContainer();
+ List<Event> outgoingEvents;
+ wc.verifyState(AMContainerState.ALLOCATED);
+
+ wc.containerCompleted();
+ wc.verifyState(AMContainerState.COMPLETED);
+ outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
+ verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+ AMSchedulerEventType.S_CONTAINER_COMPLETED);
+
+ assertFalse(wc.amContainer.isInErrorState());
+ }
+
+ @Ignore
+ @SuppressWarnings("rawtypes")
+ @Test
+ // Verify that incoming NM launched events to COMPLETED containers are
+ // handled.
+ public void testContainerCompletedAtLaunching() {
+ WrappedContainer wc = new WrappedContainer();
+ List<Event> outgoingEvents;
+
+ wc.launchContainer();
+
+
+ wc.assignTaskAttempt(wc.taskAttemptID);
+
+ wc.containerCompleted();
+ wc.verifyState(AMContainerState.COMPLETED);
+ verify(wc.tal).registerRunningContainer(wc.containerID);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID);
+
+ outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
+ verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+ AMSchedulerEventType.S_CONTAINER_COMPLETED,
+ TaskAttemptEventType.TA_CONTAINER_TERMINATED);
+ // TODO Failing because of an extra diagnostic event.
+
+ assertFalse(wc.amContainer.isInErrorState());
+
+ // Container launched generated by NM call.
+ wc.containerLaunched();
+ wc.verifyNoOutgoingEvents();
+
+ assertFalse(wc.amContainer.isInErrorState());
+ }
+
+ @Ignore
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testContainerCompletedAtIdle() {
+ WrappedContainer wc = new WrappedContainer();
+ List<Event> outgoingEvents;
+
+ wc.launchContainer();
+
+ wc.assignTaskAttempt(wc.taskAttemptID);
+ wc.containerLaunched();
+ wc.verifyState(AMContainerState.IDLE);
+
+ wc.containerCompleted();
+ wc.verifyState(AMContainerState.COMPLETED);
+ verify(wc.tal).registerRunningContainer(wc.containerID);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID);
+ verify(wc.chh).register(wc.containerID);
+ verify(wc.chh).unregister(wc.containerID);
+
+ outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
+ verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+ AMSchedulerEventType.S_CONTAINER_COMPLETED,
+ TaskAttemptEventType.TA_CONTAINER_TERMINATED);
+ // TODO Failing because of two extra diagnostic event.
+
+ assertFalse(wc.amContainer.isInErrorState());
+
+ // Pending pull request. (Ideally, container should be dead at this point
+ // and this event should not be generated. Network timeout on NM-RM heartbeat
+ // can cause it to be genreated)
+ wc.pullTaskToRun();
+ wc.verifyNoOutgoingEvents();
+
+ assertFalse(wc.amContainer.isInErrorState());
+ }
+
+ @Ignore
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testContainerCompletedAtRunning() {
+ WrappedContainer wc = new WrappedContainer();
+ List<Event> outgoingEvents;
+
+ wc.launchContainer();
+
+ wc.assignTaskAttempt(wc.taskAttemptID);
+ wc.containerLaunched();
+ wc.pullTaskToRun();
+ wc.verifyState(AMContainerState.RUNNING);
+
+ wc.containerCompleted();
+ wc.verifyState(AMContainerState.COMPLETED);
+ verify(wc.tal).registerRunningContainer(wc.containerID);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID);
+ verify(wc.chh).register(wc.containerID);
+ verify(wc.chh).unregister(wc.containerID);
+
+ outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
+ verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+ AMSchedulerEventType.S_CONTAINER_COMPLETED,
+ TaskAttemptEventType.TA_CONTAINER_TERMINATED);
+ // TODO Failing because of two extra diagnostic event.
+
+ assertFalse(wc.amContainer.isInErrorState());
+
+ // Pending task complete. (Ideally, container should be dead at this point
+ // and this event should not be generated. Network timeout on NM-RM heartbeat
+ // can cause it to be genreated)
+ wc.taskAttemptSucceeded(wc.taskAttemptID);
+ wc.verifyNoOutgoingEvents();
+
+ assertFalse(wc.amContainer.isInErrorState());
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testTaskAssignedToCompletedContainer() {
+ WrappedContainer wc = new WrappedContainer();
+ List<Event> outgoingEvents;
+
+ wc.launchContainer();
+ wc.containerLaunched();
+ wc.assignTaskAttempt(wc.taskAttemptID);
+ wc.pullTaskToRun();
+ wc.taskAttemptSucceeded(wc.taskAttemptID);
+
+ wc.containerCompleted();
+ wc.verifyState(AMContainerState.COMPLETED);
+
+ TezTaskAttemptID taID2 = new TezTaskAttemptID(wc.taskID, 2);
+
+ wc.assignTaskAttempt(taID2);
+
+ outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
+ verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+ TaskAttemptEventType.TA_CONTAINER_TERMINATED);
+ TaskAttemptEventContainerTerminated ctEvent =
+ (TaskAttemptEventContainerTerminated) outgoingEvents.get(0);
+ assertEquals(taID2, ctEvent.getTaskAttemptID());
+
+ // Allocation to a completed Container is considered an error.
+ // TODO Is this valid ?
+ assertTrue(wc.amContainer.isInErrorState());
+ }
+
+ @Test
+ public void testTaskPullAtLaunching() {
+ WrappedContainer wc = new WrappedContainer();
+
+ wc.launchContainer();
+ AMContainerTask pulledTask = wc.pullTaskToRun();
+ wc.verifyState(AMContainerState.LAUNCHING);
+ wc.verifyNoOutgoingEvents();
+ assertFalse(pulledTask.shouldDie());
+ assertNull(pulledTask.getTask());
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testNodeFailedAtIdle() {
+ WrappedContainer wc = new WrappedContainer();
+ List<Event> outgoingEvents;
+
+ wc.launchContainer();
+ wc.containerLaunched();
+ wc.assignTaskAttempt(wc.taskAttemptID);
+ wc.verifyState(AMContainerState.IDLE);
+
+ wc.nodeFailed();
+ // Expecting a complete event from the RM
+ wc.verifyState(AMContainerState.STOPPING);
+ outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3);
+ verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+ TaskAttemptEventType.TA_NODE_FAILED,
+ TaskAttemptEventType.TA_CONTAINER_TERMINATING,
+ AMSchedulerEventType.S_CONTAINER_DEALLOCATE);
+
+ for (Event event : outgoingEvents) {
+ if (event.getType() == TaskAttemptEventType.TA_NODE_FAILED) {
+ TaskAttemptEventNodeFailed nfEvent = (TaskAttemptEventNodeFailed) event;
+ assertEquals("nodeFailed", nfEvent.getDiagnosticInfo());
+ }
+ }
+
+ wc.containerCompleted();
+ outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
+ verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+ TaskAttemptEventType.TA_CONTAINER_TERMINATED,
+ AMSchedulerEventType.S_CONTAINER_COMPLETED);
+
+ assertFalse(wc.amContainer.isInErrorState());
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testNodeFailedAtIdleMultipleAttempts() {
+ WrappedContainer wc = new WrappedContainer();
+ List<Event> outgoingEvents;
+
+ wc.launchContainer();
+ wc.containerLaunched();
+ wc.assignTaskAttempt(wc.taskAttemptID);
+ wc.pullTaskToRun();
+ wc.taskAttemptSucceeded(wc.taskAttemptID);
+ wc.verifyState(AMContainerState.IDLE);
+
+ TezTaskAttemptID taID2 = new TezTaskAttemptID(wc.taskID, 2);
+ wc.assignTaskAttempt(taID2);
+ wc.pullTaskToRun();
+ wc.taskAttemptSucceeded(taID2);
+ wc.verifyState(AMContainerState.IDLE);
+
+ wc.nodeFailed();
+ // Expecting a complete event from the RM
+ wc.verifyState(AMContainerState.STOPPING);
+ outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3);
+ verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+ TaskAttemptEventType.TA_NODE_FAILED,
+ TaskAttemptEventType.TA_NODE_FAILED,
+ AMSchedulerEventType.S_CONTAINER_DEALLOCATE);
+
+ for (Event event : outgoingEvents) {
+ if (event.getType() == TaskAttemptEventType.TA_NODE_FAILED) {
+ TaskAttemptEventNodeFailed nfEvent = (TaskAttemptEventNodeFailed) event;
+ assertEquals("nodeFailed", nfEvent.getDiagnosticInfo());
+ }
+ }
+
+ assertFalse(wc.amContainer.isInErrorState());
+
+ wc.containerCompleted();
+ outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
+ verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+ AMSchedulerEventType.S_CONTAINER_COMPLETED);
+
+ assertNull(wc.amContainer.getRunningTaskAttempt());
+ assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size());
+ assertEquals(2, wc.amContainer.getCompletedTaskAttempts().size());
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testNodeFailedAtRunningMultipleAttempts() {
+ WrappedContainer wc = new WrappedContainer();
+ List<Event> outgoingEvents;
+
+ wc.launchContainer();
+ wc.containerLaunched();
+ wc.assignTaskAttempt(wc.taskAttemptID);
+ wc.pullTaskToRun();
+ wc.taskAttemptSucceeded(wc.taskAttemptID);
+
+ TezTaskAttemptID taID2 = new TezTaskAttemptID(wc.taskID, 2);
+ wc.assignTaskAttempt(taID2);
+ wc.pullTaskToRun();
+ wc.verifyState(AMContainerState.RUNNING);
+
+ wc.nodeFailed();
+ // Expecting a complete event from the RM
+ wc.verifyState(AMContainerState.STOPPING);
+ outgoingEvents = wc.verifyCountAndGetOutgoingEvents(4);
+ verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+ TaskAttemptEventType.TA_NODE_FAILED,
+ TaskAttemptEventType.TA_NODE_FAILED,
+ TaskAttemptEventType.TA_CONTAINER_TERMINATING,
+ AMSchedulerEventType.S_CONTAINER_DEALLOCATE);
+
+ for (Event event : outgoingEvents) {
+ if (event.getType() == TaskAttemptEventType.TA_NODE_FAILED) {
+ TaskAttemptEventNodeFailed nfEvent = (TaskAttemptEventNodeFailed) event;
+ assertEquals("nodeFailed", nfEvent.getDiagnosticInfo());
+ }
+ }
+
+ wc.containerCompleted();
+ outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
+ verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+ TaskAttemptEventType.TA_CONTAINER_TERMINATED,
+ AMSchedulerEventType.S_CONTAINER_COMPLETED);
+
+ assertFalse(wc.amContainer.isInErrorState());
+// assertNull(wc.amContainer.getRunningTaskAttempt()); // TODO. Set/Unset properly.
+// assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size()); // TODO. Set/Unset properly.
+// assertEquals(2, wc.amContainer.getCompletedTaskAttempts().size()); // TODO. Set/Unset properly.
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testNodeFailedAtCompletedMultipleSuccessfulTAs() {
+ WrappedContainer wc = new WrappedContainer();
+ List<Event> outgoingEvents;
+
+ wc.launchContainer();
+ wc.containerLaunched();
+ wc.assignTaskAttempt(wc.taskAttemptID);
+ wc.pullTaskToRun();
+ wc.taskAttemptSucceeded(wc.taskAttemptID);
+
+ TezTaskAttemptID taID2 = new TezTaskAttemptID(wc.taskID, 2);
+ wc.assignTaskAttempt(taID2);
+ wc.pullTaskToRun();
+ wc.taskAttemptSucceeded(taID2);
+ wc.stopRequest();
+ wc.nmStopSent();
+ wc.containerCompleted();
+ wc.verifyState(AMContainerState.COMPLETED);
+
+ wc.nodeFailed();
+ outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
+ verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+ TaskAttemptEventType.TA_NODE_FAILED,
+ TaskAttemptEventType.TA_NODE_FAILED);
+
+ assertNull(wc.amContainer.getRunningTaskAttempt());
+ assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size());
+ assertEquals(2, wc.amContainer.getCompletedTaskAttempts().size());
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testDuplicateCompletedEvents() {
+ WrappedContainer wc = new WrappedContainer();
+ List<Event> outgoingEvents;
+
+ wc.launchContainer();
+ wc.containerLaunched();
+ wc.assignTaskAttempt(wc.taskAttemptID);
+ wc.pullTaskToRun();
+ wc.taskAttemptSucceeded(wc.taskAttemptID);
+
+ TezTaskAttemptID taID2 = new TezTaskAttemptID(wc.taskID, 2);
+ wc.assignTaskAttempt(taID2);
+ wc.pullTaskToRun();
+ wc.taskAttemptSucceeded(taID2);
+ wc.stopRequest();
+ wc.nmStopSent();
+ wc.containerCompleted();
+ wc.verifyState(AMContainerState.COMPLETED);
+
+ outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
+ verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+ AMSchedulerEventType.S_CONTAINER_COMPLETED);
+
+ wc.containerCompleted();
+ wc.verifyNoOutgoingEvents();
+ }
+
+
+ // TODO Verify diagnostics in most of the tests.
+
+ private static class WrappedContainer {
+
+ long rmIdentifier = 2000;
+ ApplicationId applicationID;
+ ApplicationAttemptId appAttemptID;
+ ContainerId containerID;
+ NodeId nodeID;
+ String nodeHttpAddress;
+ Resource resource;
+ Priority priority;
+ Container container;
+ ContainerHeartbeatHandler chh;
+ TaskAttemptListener tal;
+
+ @SuppressWarnings("rawtypes")
+ EventHandler eventHandler;
+
+ AppContext appContext;
+
+ TezDAGID dagID;
+ TezVertexID vertexID;
+ TezTaskID taskID;
+ TezTaskAttemptID taskAttemptID;
+
+ TezTaskContext tezTaskContext;
+
+ Token<JobTokenIdentifier> jobToken;
+
+ public AMContainerImpl amContainer;
+
+ @SuppressWarnings("unchecked")
+ public WrappedContainer() {
+ applicationID = BuilderUtils.newApplicationId(rmIdentifier, 1);
+ appAttemptID = BuilderUtils.newApplicationAttemptId(applicationID, 1);
+ containerID = BuilderUtils.newContainerId(appAttemptID, 1);
+ nodeID = BuilderUtils.newNodeId("host", 12500);
+ nodeHttpAddress = "host:12501";
+ resource = BuilderUtils.newResource(1024, 1);
+ priority = BuilderUtils.newPriority(1);
+ container = BuilderUtils.newContainer(containerID, nodeID,
+ nodeHttpAddress, resource, priority, null, rmIdentifier);
+ chh = mock(ContainerHeartbeatHandler.class);
+
+ InetSocketAddress addr = new InetSocketAddress("localhost", 0);
+ tal = mock(TaskAttemptListener.class);
+ doReturn(addr).when(tal).getAddress();
+
+ eventHandler = mock(EventHandler.class);
+
+ appContext = mock(AppContext.class);
+ doReturn(new HashMap<ApplicationAccessType, String>()).when(appContext)
+ .getApplicationACLs();
+ doReturn(eventHandler).when(appContext).getEventHandler();
+
+ dagID = new TezDAGID(applicationID, 1);
+ vertexID = new TezVertexID(dagID, 1);
+ taskID = new TezTaskID(vertexID, 1);
+ taskAttemptID = new TezTaskAttemptID(taskID, 1);
+
+ tezTaskContext = mock(TezTaskContext.class);
+ doReturn(taskAttemptID).when(tezTaskContext).getTaskAttemptId();
+
+
+ jobToken = (Token<JobTokenIdentifier>) mock(Token.class);
+
+ amContainer = new AMContainerImpl(container, chh, tal,
+ appContext);
+ }
+
+ /**
+ * Verifies no additional outgoing events generated by the last incoming
+ * event to the AMContainer.
+ */
+ @SuppressWarnings("unchecked")
+ public void verifyNoOutgoingEvents() {
+ verify(eventHandler, never()).handle(any(Event.class));
+ }
+
+ /**
+ * Returns a list of outgoing events generated by the last incoming event to
+ * the AMContainer.
+ * @param invocations number of expected invocations.
+ *
+ * @return a list of outgoing events from the AMContainer.
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public List<Event> verifyCountAndGetOutgoingEvents(int invocations) {
+ ArgumentCaptor<Event> args = ArgumentCaptor.forClass(Event.class);
+ verify(eventHandler, times(invocations)).handle(args.capture());
+ return args.getAllValues();
+ }
+
+ public void launchContainer() {
+ reset(eventHandler);
+ amContainer.handle(new AMContainerEventLaunchRequest(containerID, vertexID,
+ jobToken, new Credentials(), false, new TezConfiguration(),
+ new HashMap<String, LocalResource>(), new HashMap<String, String>(),
+ null));
+ }
+
+ public void assignTaskAttempt(TezTaskAttemptID taID) {
+ reset(eventHandler);
+ amContainer.handle(new AMContainerEventAssignTA(containerID, taID,
+ tezTaskContext));
+ }
+
+ public AMContainerTask pullTaskToRun() {
+ reset(eventHandler);
+ return amContainer.pullTaskContext();
+ }
+
+ public void containerLaunched() {
+ reset(eventHandler);
+ amContainer.handle(new AMContainerEventLaunched(containerID, 3000));
+ }
+
+ public void taskAttemptSucceeded(TezTaskAttemptID taID) {
+ reset(eventHandler);
+ amContainer.handle(new AMContainerEventTASucceeded(containerID, taID));
+ }
+
+ public void stopRequest() {
+ reset(eventHandler);
+ amContainer.handle(new AMContainerEvent(containerID,
+ AMContainerEventType.C_STOP_REQUEST));
+ }
+
+ public void nmStopSent() {
+ reset(eventHandler);
+ amContainer.handle(new AMContainerEvent(containerID,
+ AMContainerEventType.C_NM_STOP_SENT));
+ }
+
+ public void nmStopFailed() {
+ reset(eventHandler);
+ amContainer.handle(new AMContainerEvent(containerID,
+ AMContainerEventType.C_NM_STOP_FAILED));
+ }
+
+ public void containerCompleted() {
+ reset(eventHandler);
+ ContainerStatus cStatus = ContainerStatus.newInstance(containerID,
+ ContainerState.COMPLETE, "", 100);
+ amContainer.handle(new AMContainerEventCompleted(cStatus));
+ }
+
+ public void containerTimedOut() {
+ reset(eventHandler);
+ amContainer.handle(new AMContainerEvent(containerID,
+ AMContainerEventType.C_TIMED_OUT));
+ }
+
+ public void launchFailed() {
+ reset(eventHandler);
+ amContainer.handle(new AMContainerEventLaunchFailed(containerID,
+ "launchFailed"));
+ }
+
+ public void nodeFailed() {
+ reset(eventHandler);
+ amContainer.handle(new AMContainerEventNodeFailed(containerID,
+ "nodeFailed"));
+ }
+
+ public void verifyState(AMContainerState state) {
+ assertEquals(
+ "Expected state: " + state + ", but found: " + amContainer.getState(),
+ state, amContainer.getState());
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
+ private void verifyUnOrderedOutgoingEventTypes(List<Event> events,
+ Enum<?>... expectedTypes) {
+
+ List<Enum<?>> expectedTypeList = new LinkedList<Enum<?>>();
+ for (Enum<?> expectedType : expectedTypes) {
+ expectedTypeList.add(expectedType);
+ }
+ List<Event> eventsCopy = new LinkedList<Event>(events);
+
+ Iterator<Enum<?>> expectedTypeIterator = expectedTypeList.iterator();
+ while (expectedTypeIterator.hasNext()) {
+ Enum<?> expectedType = expectedTypeIterator.next();
+ Iterator<Event> iter = eventsCopy.iterator();
+ while (iter.hasNext()) {
+ Event e = iter.next();
+ if (e.getType() == expectedType) {
+ iter.remove();
+ expectedTypeIterator.remove();
+ break;
+ }
+ }
+ }
+ assertTrue("Did not find types : " + expectedTypeList
+ + " in outgoing event list", expectedTypeList.isEmpty());
+ assertTrue("Found unexpected events: " + eventsCopy
+ + " in outgoing event list", eventsCopy.isEmpty());
+ }
+}