You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/02/12 20:00:08 UTC

git commit: TEZ-831. Reduce line length of state machines (bikas)

Updated Branches:
  refs/heads/master fea8ffa8d -> 54e232211


TEZ-831. Reduce line length of state machines (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/54e23221
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/54e23221
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/54e23221

Branch: refs/heads/master
Commit: 54e232211b0bc11889c87bfddb484bda1f60295f
Parents: fea8ffa
Author: Bikas Saha <bi...@apache.org>
Authored: Wed Feb 12 10:59:53 2014 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Wed Feb 12 10:59:53 2014 -0800

----------------------------------------------------------------------
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   | 346 +++++++++++++++----
 .../dag/app/rm/container/AMContainerImpl.java   | 240 ++++++++++---
 .../apache/tez/dag/app/rm/node/AMNodeImpl.java  | 117 +++++--
 3 files changed, 553 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/54e23221/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 02cbe38..d555e04 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -177,70 +177,288 @@ public class TaskAttemptImpl implements TaskAttempt,
             <TaskAttemptImpl, TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent>
             (TaskAttemptStateInternal.NEW)
 
-        .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.START_WAIT, TaskAttemptEventType.TA_SCHEDULE, new ScheduleTaskattemptTransition())
-        .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.NEW, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
-        .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_FAIL_REQUEST, new TerminateTransition(FAILED_HELPER))
-        .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_KILL_REQUEST, new TerminateTransition(KILLED_HELPER))
-
-        .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.RUNNING, TaskAttemptEventType.TA_STARTED_REMOTELY, new StartedTransition())
-        .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.START_WAIT, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
-        .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_FAIL_REQUEST, new TerminatedBeforeRunningTransition(FAILED_HELPER))
-        .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_KILL_REQUEST, new TerminatedBeforeRunningTransition(KILLED_HELPER))
-        .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_NODE_FAILED, new NodeFailedBeforeRunningTransition())
-        .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_CONTAINER_TERMINATING, new ContainerTerminatingBeforeRunningTransition())
-        .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedBeforeRunningTransition())
-        .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_CONTAINER_PREEMPTED, new ContainerCompletedBeforeRunningTransition(KILLED_HELPER))
-
-        .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING, TaskAttemptEventType.TA_STATUS_UPDATE, STATUS_UPDATER)
-        .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
-        .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, new OutputConsumableTransition()) //Optional, may not come in for all tasks.
-        .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_DONE, new SucceededTransition())
-        .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_FAILED, new TerminatedWhileRunningTransition(FAILED_HELPER))
-        .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_TIMED_OUT, new TerminatedWhileRunningTransition(FAILED_HELPER))
-        .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_FAIL_REQUEST, new TerminatedWhileRunningTransition(FAILED_HELPER))
-        .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_KILL_REQUEST, new TerminatedWhileRunningTransition(KILLED_HELPER))
-        .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_NODE_FAILED, new TerminatedWhileRunningTransition(KILLED_HELPER))
-        .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_CONTAINER_TERMINATING, new TerminatedWhileRunningTransition(FAILED_HELPER))
-        .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedWhileRunningTransition())
-        .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_CONTAINER_PREEMPTED, new ContainerCompletedWhileRunningTransition(KILLED_HELPER))
-        .addTransition(TaskAttemptStateInternal.RUNNING, EnumSet.of(TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptStateInternal.RUNNING), TaskAttemptEventType.TA_OUTPUT_FAILED, new OutputReportedFailedTransition())
-
-        .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_STATUS_UPDATE, STATUS_UPDATER)
-        .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
-        .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE) // Stuck RPC. The client retries in a loop.
-        .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_DONE, new SucceededTransition())
-        .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_FAILED,  new TerminatedWhileRunningTransition(FAILED_HELPER))
-        .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_TIMED_OUT,  new TerminatedWhileRunningTransition(FAILED_HELPER))
-        .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_FAIL_REQUEST,  new TerminatedWhileRunningTransition(FAILED_HELPER))
-        // TODO CREUSE Ensure TaskCompletionEvents are updated to reflect this. Something needs to go out to the job.
-        .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_KILL_REQUEST, new TerminatedWhileRunningTransition(KILLED_HELPER))
-        .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_NODE_FAILED, new TerminatedWhileRunningTransition(KILLED_HELPER))
-        .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_CONTAINER_TERMINATING, new TerminatedWhileRunningTransition(FAILED_HELPER))
-        .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedBeforeRunningTransition())
-        .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_CONTAINER_PREEMPTED, new ContainerCompletedBeforeRunningTransition(KILLED_HELPER))
-        .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, EnumSet.of(TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptStateInternal.OUTPUT_CONSUMABLE), TaskAttemptEventType.TA_OUTPUT_FAILED, new OutputReportedFailedTransition())
-
-        .addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedWhileTerminating())
-        .addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
-        .addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptStateInternal.KILL_IN_PROGRESS, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_CONTAINER_PREEMPTED, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_OUTPUT_FAILED))
-
-        .addTransition(TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedWhileTerminating())
-        .addTransition(TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
-        .addTransition(TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptStateInternal.FAIL_IN_PROGRESS, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_CONTAINER_PREEMPTED, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_OUTPUT_FAILED))
-
-        .addTransition(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
-        .addTransition(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.KILLED, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_SCHEDULE, TaskAttemptEventType.TA_CONTAINER_PREEMPTED, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED, TaskAttemptEventType.TA_OUTPUT_FAILED))
-
-        .addTransition(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
-        .addTransition(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.FAILED, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_SCHEDULE, TaskAttemptEventType.TA_CONTAINER_PREEMPTED, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED, TaskAttemptEventType.TA_OUTPUT_FAILED))
-
-        // How will duplicate history events be handled ?
-        // TODO Maybe consider not failing REDUCE tasks in this case. Also, MAP_TASKS in case there's only one phase in the job.
-        .addTransition(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
-        .addTransition(TaskAttemptStateInternal.SUCCEEDED, EnumSet.of(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.SUCCEEDED), TaskAttemptEventType.TA_KILL_REQUEST, new TerminatedAfterSuccessTransition())
-        .addTransition(TaskAttemptStateInternal.SUCCEEDED, EnumSet.of(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.SUCCEEDED), TaskAttemptEventType.TA_NODE_FAILED, new TerminatedAfterSuccessTransition())
-        .addTransition(TaskAttemptStateInternal.SUCCEEDED, EnumSet.of(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.SUCCEEDED), TaskAttemptEventType.TA_OUTPUT_FAILED, new OutputReportedFailedTransition())
-        .addTransition(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.SUCCEEDED, EnumSet.of(TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED, TaskAttemptEventType.TA_CONTAINER_PREEMPTED))
+      .addTransition(TaskAttemptStateInternal.NEW,
+          TaskAttemptStateInternal.START_WAIT,
+          TaskAttemptEventType.TA_SCHEDULE, new ScheduleTaskattemptTransition())
+      .addTransition(TaskAttemptStateInternal.NEW,
+          TaskAttemptStateInternal.NEW,
+          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+          DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+      .addTransition(TaskAttemptStateInternal.NEW,
+          TaskAttemptStateInternal.FAILED,
+          TaskAttemptEventType.TA_FAIL_REQUEST,
+          new TerminateTransition(FAILED_HELPER))
+      .addTransition(TaskAttemptStateInternal.NEW,
+          TaskAttemptStateInternal.KILLED,
+          TaskAttemptEventType.TA_KILL_REQUEST,
+          new TerminateTransition(KILLED_HELPER))
+
+      .addTransition(TaskAttemptStateInternal.START_WAIT,
+          TaskAttemptStateInternal.RUNNING,
+          TaskAttemptEventType.TA_STARTED_REMOTELY, new StartedTransition())
+      .addTransition(TaskAttemptStateInternal.START_WAIT,
+          TaskAttemptStateInternal.START_WAIT,
+          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+          DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+      .addTransition(TaskAttemptStateInternal.START_WAIT,
+          TaskAttemptStateInternal.FAIL_IN_PROGRESS,
+          TaskAttemptEventType.TA_FAIL_REQUEST,
+          new TerminatedBeforeRunningTransition(FAILED_HELPER))
+      .addTransition(TaskAttemptStateInternal.START_WAIT,
+          TaskAttemptStateInternal.KILL_IN_PROGRESS,
+          TaskAttemptEventType.TA_KILL_REQUEST,
+          new TerminatedBeforeRunningTransition(KILLED_HELPER))
+      .addTransition(TaskAttemptStateInternal.START_WAIT,
+          TaskAttemptStateInternal.KILL_IN_PROGRESS,
+          TaskAttemptEventType.TA_NODE_FAILED,
+          new NodeFailedBeforeRunningTransition())
+      .addTransition(TaskAttemptStateInternal.START_WAIT,
+          TaskAttemptStateInternal.FAIL_IN_PROGRESS,
+          TaskAttemptEventType.TA_CONTAINER_TERMINATING,
+          new ContainerTerminatingBeforeRunningTransition())
+      .addTransition(TaskAttemptStateInternal.START_WAIT,
+          TaskAttemptStateInternal.FAILED,
+          TaskAttemptEventType.TA_CONTAINER_TERMINATED,
+          new ContainerCompletedBeforeRunningTransition())
+      .addTransition(TaskAttemptStateInternal.START_WAIT,
+          TaskAttemptStateInternal.KILLED,
+          TaskAttemptEventType.TA_CONTAINER_PREEMPTED,
+          new ContainerCompletedBeforeRunningTransition(KILLED_HELPER))
+
+      .addTransition(TaskAttemptStateInternal.RUNNING,
+          TaskAttemptStateInternal.RUNNING,
+          TaskAttemptEventType.TA_STATUS_UPDATE, STATUS_UPDATER)
+      .addTransition(TaskAttemptStateInternal.RUNNING,
+          TaskAttemptStateInternal.RUNNING,
+          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+          DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+      .addTransition(TaskAttemptStateInternal.RUNNING,
+          TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
+          TaskAttemptEventType.TA_OUTPUT_CONSUMABLE,
+          new OutputConsumableTransition())
+      // Optional, may not come in for all tasks.
+      .addTransition(TaskAttemptStateInternal.RUNNING,
+          TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_DONE,
+          new SucceededTransition())
+      .addTransition(TaskAttemptStateInternal.RUNNING,
+          TaskAttemptStateInternal.FAIL_IN_PROGRESS,
+          TaskAttemptEventType.TA_FAILED,
+          new TerminatedWhileRunningTransition(FAILED_HELPER))
+      .addTransition(TaskAttemptStateInternal.RUNNING,
+          TaskAttemptStateInternal.FAIL_IN_PROGRESS,
+          TaskAttemptEventType.TA_TIMED_OUT,
+          new TerminatedWhileRunningTransition(FAILED_HELPER))
+      .addTransition(TaskAttemptStateInternal.RUNNING,
+          TaskAttemptStateInternal.FAIL_IN_PROGRESS,
+          TaskAttemptEventType.TA_FAIL_REQUEST,
+          new TerminatedWhileRunningTransition(FAILED_HELPER))
+      .addTransition(TaskAttemptStateInternal.RUNNING,
+          TaskAttemptStateInternal.KILL_IN_PROGRESS,
+          TaskAttemptEventType.TA_KILL_REQUEST,
+          new TerminatedWhileRunningTransition(KILLED_HELPER))
+      .addTransition(TaskAttemptStateInternal.RUNNING,
+          TaskAttemptStateInternal.KILL_IN_PROGRESS,
+          TaskAttemptEventType.TA_NODE_FAILED,
+          new TerminatedWhileRunningTransition(KILLED_HELPER))
+      .addTransition(TaskAttemptStateInternal.RUNNING,
+          TaskAttemptStateInternal.FAIL_IN_PROGRESS,
+          TaskAttemptEventType.TA_CONTAINER_TERMINATING,
+          new TerminatedWhileRunningTransition(FAILED_HELPER))
+      .addTransition(TaskAttemptStateInternal.RUNNING,
+          TaskAttemptStateInternal.FAILED,
+          TaskAttemptEventType.TA_CONTAINER_TERMINATED,
+          new ContainerCompletedWhileRunningTransition())
+      .addTransition(TaskAttemptStateInternal.RUNNING,
+          TaskAttemptStateInternal.KILLED,
+          TaskAttemptEventType.TA_CONTAINER_PREEMPTED,
+          new ContainerCompletedWhileRunningTransition(KILLED_HELPER))
+      .addTransition(
+          TaskAttemptStateInternal.RUNNING,
+          EnumSet.of(TaskAttemptStateInternal.FAIL_IN_PROGRESS,
+              TaskAttemptStateInternal.RUNNING),
+          TaskAttemptEventType.TA_OUTPUT_FAILED,
+          new OutputReportedFailedTransition())
+
+      .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
+          TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
+          TaskAttemptEventType.TA_STATUS_UPDATE, STATUS_UPDATER)
+      .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
+          TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
+          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+          DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+      .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
+          TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
+          TaskAttemptEventType.TA_OUTPUT_CONSUMABLE)
+      // Stuck RPC. The client retries in a loop.
+      .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
+          TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_DONE,
+          new SucceededTransition())
+      .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
+          TaskAttemptStateInternal.FAIL_IN_PROGRESS,
+          TaskAttemptEventType.TA_FAILED,
+          new TerminatedWhileRunningTransition(FAILED_HELPER))
+      .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
+          TaskAttemptStateInternal.FAIL_IN_PROGRESS,
+          TaskAttemptEventType.TA_TIMED_OUT,
+          new TerminatedWhileRunningTransition(FAILED_HELPER))
+      .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
+          TaskAttemptStateInternal.FAIL_IN_PROGRESS,
+          TaskAttemptEventType.TA_FAIL_REQUEST,
+          new TerminatedWhileRunningTransition(FAILED_HELPER))
+      // TODO CREUSE Ensure TaskCompletionEvents are updated to reflect this.
+      // Something needs to go out to the job.
+      .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
+          TaskAttemptStateInternal.KILL_IN_PROGRESS,
+          TaskAttemptEventType.TA_KILL_REQUEST,
+          new TerminatedWhileRunningTransition(KILLED_HELPER))
+      .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
+          TaskAttemptStateInternal.KILL_IN_PROGRESS,
+          TaskAttemptEventType.TA_NODE_FAILED,
+          new TerminatedWhileRunningTransition(KILLED_HELPER))
+      .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
+          TaskAttemptStateInternal.FAIL_IN_PROGRESS,
+          TaskAttemptEventType.TA_CONTAINER_TERMINATING,
+          new TerminatedWhileRunningTransition(FAILED_HELPER))
+      .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
+          TaskAttemptStateInternal.FAILED,
+          TaskAttemptEventType.TA_CONTAINER_TERMINATED,
+          new ContainerCompletedBeforeRunningTransition())
+      .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
+          TaskAttemptStateInternal.KILLED,
+          TaskAttemptEventType.TA_CONTAINER_PREEMPTED,
+          new ContainerCompletedBeforeRunningTransition(KILLED_HELPER))
+      .addTransition(
+          TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
+          EnumSet.of(TaskAttemptStateInternal.FAIL_IN_PROGRESS,
+              TaskAttemptStateInternal.OUTPUT_CONSUMABLE),
+          TaskAttemptEventType.TA_OUTPUT_FAILED,
+          new OutputReportedFailedTransition())
+
+      .addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS,
+          TaskAttemptStateInternal.KILLED,
+          TaskAttemptEventType.TA_CONTAINER_TERMINATED,
+          new ContainerCompletedWhileTerminating())
+      .addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS,
+          TaskAttemptStateInternal.KILL_IN_PROGRESS,
+          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+          DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+      .addTransition(
+          TaskAttemptStateInternal.KILL_IN_PROGRESS,
+          TaskAttemptStateInternal.KILL_IN_PROGRESS,
+          EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY,
+              TaskAttemptEventType.TA_CONTAINER_PREEMPTED,
+              TaskAttemptEventType.TA_STATUS_UPDATE,
+              TaskAttemptEventType.TA_OUTPUT_CONSUMABLE,
+              TaskAttemptEventType.TA_COMMIT_PENDING,
+              TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED,
+              TaskAttemptEventType.TA_TIMED_OUT,
+              TaskAttemptEventType.TA_FAIL_REQUEST,
+              TaskAttemptEventType.TA_KILL_REQUEST,
+              TaskAttemptEventType.TA_NODE_FAILED,
+              TaskAttemptEventType.TA_CONTAINER_TERMINATING,
+              TaskAttemptEventType.TA_OUTPUT_FAILED))
+
+      .addTransition(TaskAttemptStateInternal.FAIL_IN_PROGRESS,
+          TaskAttemptStateInternal.FAILED,
+          TaskAttemptEventType.TA_CONTAINER_TERMINATED,
+          new ContainerCompletedWhileTerminating())
+      .addTransition(TaskAttemptStateInternal.FAIL_IN_PROGRESS,
+          TaskAttemptStateInternal.FAIL_IN_PROGRESS,
+          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+          DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+      .addTransition(
+          TaskAttemptStateInternal.FAIL_IN_PROGRESS,
+          TaskAttemptStateInternal.FAIL_IN_PROGRESS,
+          EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY,
+              TaskAttemptEventType.TA_CONTAINER_PREEMPTED,
+              TaskAttemptEventType.TA_STATUS_UPDATE,
+              TaskAttemptEventType.TA_OUTPUT_CONSUMABLE,
+              TaskAttemptEventType.TA_COMMIT_PENDING,
+              TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED,
+              TaskAttemptEventType.TA_TIMED_OUT,
+              TaskAttemptEventType.TA_FAIL_REQUEST,
+              TaskAttemptEventType.TA_KILL_REQUEST,
+              TaskAttemptEventType.TA_NODE_FAILED,
+              TaskAttemptEventType.TA_CONTAINER_TERMINATING,
+              TaskAttemptEventType.TA_OUTPUT_FAILED))
+
+      .addTransition(TaskAttemptStateInternal.KILLED,
+          TaskAttemptStateInternal.KILLED,
+          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+          DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+      .addTransition(
+          TaskAttemptStateInternal.KILLED,
+          TaskAttemptStateInternal.KILLED,
+          EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY,
+              TaskAttemptEventType.TA_SCHEDULE,
+              TaskAttemptEventType.TA_CONTAINER_PREEMPTED,
+              TaskAttemptEventType.TA_STATUS_UPDATE,
+              TaskAttemptEventType.TA_OUTPUT_CONSUMABLE,
+              TaskAttemptEventType.TA_COMMIT_PENDING,
+              TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED,
+              TaskAttemptEventType.TA_TIMED_OUT,
+              TaskAttemptEventType.TA_FAIL_REQUEST,
+              TaskAttemptEventType.TA_KILL_REQUEST,
+              TaskAttemptEventType.TA_NODE_FAILED,
+              TaskAttemptEventType.TA_CONTAINER_TERMINATING,
+              TaskAttemptEventType.TA_CONTAINER_TERMINATED,
+              TaskAttemptEventType.TA_OUTPUT_FAILED))
+
+      .addTransition(TaskAttemptStateInternal.FAILED,
+          TaskAttemptStateInternal.FAILED,
+          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+          DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+      .addTransition(
+          TaskAttemptStateInternal.FAILED,
+          TaskAttemptStateInternal.FAILED,
+          EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY,
+              TaskAttemptEventType.TA_SCHEDULE,
+              TaskAttemptEventType.TA_CONTAINER_PREEMPTED,
+              TaskAttemptEventType.TA_STATUS_UPDATE,
+              TaskAttemptEventType.TA_OUTPUT_CONSUMABLE,
+              TaskAttemptEventType.TA_COMMIT_PENDING,
+              TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED,
+              TaskAttemptEventType.TA_TIMED_OUT,
+              TaskAttemptEventType.TA_FAIL_REQUEST,
+              TaskAttemptEventType.TA_KILL_REQUEST,
+              TaskAttemptEventType.TA_NODE_FAILED,
+              TaskAttemptEventType.TA_CONTAINER_TERMINATING,
+              TaskAttemptEventType.TA_CONTAINER_TERMINATED,
+              TaskAttemptEventType.TA_OUTPUT_FAILED))
+
+      // How will duplicate history events be handled ?
+      // TODO Maybe consider not failing REDUCE tasks in this case. Also,
+      // MAP_TASKS in case there's only one phase in the job.
+      .addTransition(TaskAttemptStateInternal.SUCCEEDED,
+          TaskAttemptStateInternal.SUCCEEDED,
+          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+          DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+      .addTransition(
+          TaskAttemptStateInternal.SUCCEEDED,
+          EnumSet.of(TaskAttemptStateInternal.KILLED,
+              TaskAttemptStateInternal.SUCCEEDED),
+          TaskAttemptEventType.TA_KILL_REQUEST,
+          new TerminatedAfterSuccessTransition())
+      .addTransition(
+          TaskAttemptStateInternal.SUCCEEDED,
+          EnumSet.of(TaskAttemptStateInternal.KILLED,
+              TaskAttemptStateInternal.SUCCEEDED),
+          TaskAttemptEventType.TA_NODE_FAILED,
+          new TerminatedAfterSuccessTransition())
+      .addTransition(
+          TaskAttemptStateInternal.SUCCEEDED,
+          EnumSet.of(TaskAttemptStateInternal.FAILED,
+              TaskAttemptStateInternal.SUCCEEDED),
+          TaskAttemptEventType.TA_OUTPUT_FAILED,
+          new OutputReportedFailedTransition())
+      .addTransition(
+          TaskAttemptStateInternal.SUCCEEDED,
+          TaskAttemptStateInternal.SUCCEEDED,
+          EnumSet.of(TaskAttemptEventType.TA_TIMED_OUT,
+              TaskAttemptEventType.TA_FAIL_REQUEST,
+              TaskAttemptEventType.TA_CONTAINER_TERMINATING,
+              TaskAttemptEventType.TA_CONTAINER_TERMINATED,
+              TaskAttemptEventType.TA_CONTAINER_PREEMPTED))
 
         .installTopology();
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/54e23221/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 95cb69d..1ce0d89 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -140,61 +140,191 @@ public class AMContainerImpl implements AMContainer {
       new StateMachineFactory<AMContainerImpl, AMContainerState, AMContainerEventType, AMContainerEvent>(
       AMContainerState.ALLOCATED)
 
-        .addTransition(AMContainerState.ALLOCATED, AMContainerState.LAUNCHING, AMContainerEventType.C_LAUNCH_REQUEST, new LaunchRequestTransition())
-        .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, AMContainerEventType.C_ASSIGN_TA, new AssignTaskAttemptAtAllocatedTransition())
-        .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, new CompletedAtAllocatedTransition())
-        .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, AMContainerEventType.C_STOP_REQUEST, new StopRequestAtAllocatedTransition())
-        .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, AMContainerEventType.C_NODE_FAILED, new NodeFailedAtAllocatedTransition())
-        .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, EnumSet.of(AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_PULL_TA, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED, AMContainerEventType.C_TIMED_OUT), new ErrorTransition())
-
-        .addTransition(AMContainerState.LAUNCHING, EnumSet.of(AMContainerState.LAUNCHING, AMContainerState.STOP_REQUESTED), AMContainerEventType.C_ASSIGN_TA, new AssignTaskAttemptTransition())
-        .addTransition(AMContainerState.LAUNCHING, AMContainerState.IDLE, AMContainerEventType.C_LAUNCHED, new LaunchedTransition())
-        .addTransition(AMContainerState.LAUNCHING, AMContainerState.STOPPING, AMContainerEventType.C_LAUNCH_FAILED, new LaunchFailedTransition())
-        // TODO CREUSE : Maybe, consider sending back an attempt if the container asks for one in this state. Waiting for a LAUNCHED event from the NMComm may delay the task allocation.
-        .addTransition(AMContainerState.LAUNCHING, AMContainerState.LAUNCHING, AMContainerEventType.C_PULL_TA) // Is assuming the pullAttempt will be null.
-        .addTransition(AMContainerState.LAUNCHING, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, new CompletedAtLaunchingTransition())
-        .addTransition(AMContainerState.LAUNCHING, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_STOP_REQUEST, new StopRequestAtLaunchingTransition())
-        .addTransition(AMContainerState.LAUNCHING, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, new NodeFailedAtLaunchingTransition())
-        .addTransition(AMContainerState.LAUNCHING, AMContainerState.STOP_REQUESTED, EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED, AMContainerEventType.C_TIMED_OUT), new ErrorAtLaunchingTransition())
-
-        .addTransition(AMContainerState.IDLE, EnumSet.of(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED), AMContainerEventType.C_ASSIGN_TA, new AssignTaskAttemptAtIdleTransition())
-        .addTransition(AMContainerState.IDLE, EnumSet.of(AMContainerState.RUNNING, AMContainerState.IDLE), AMContainerEventType.C_PULL_TA, new PullTAAtIdleTransition())
-        .addTransition(AMContainerState.IDLE, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, new CompletedAtIdleTransition())
-        .addTransition(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_STOP_REQUEST, new StopRequestAtIdleTransition())
-        .addTransition(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_TIMED_OUT, new TimedOutAtIdleTransition())
-        .addTransition(AMContainerState.IDLE, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, new NodeFailedAtIdleTransition())
-        .addTransition(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED, EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED), new ErrorAtIdleTransition())
-
-        .addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_ASSIGN_TA, new AssignTaskAttemptAtRunningTransition())
-        .addTransition(AMContainerState.RUNNING, AMContainerState.RUNNING, AMContainerEventType.C_PULL_TA)
-        .addTransition(AMContainerState.RUNNING, AMContainerState.IDLE, AMContainerEventType.C_TA_SUCCEEDED, new TASucceededAtRunningTransition())
-        .addTransition(AMContainerState.RUNNING, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, new CompletedAtRunningTransition())
-        .addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_STOP_REQUEST, new StopRequestAtRunningTransition())
-        .addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_TIMED_OUT, new TimedOutAtRunningTransition())
-        .addTransition(AMContainerState.RUNNING, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, new NodeFailedAtRunningTransition())
-        .addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED, EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED), new ErrorAtRunningTransition())
-
-        .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_ASSIGN_TA, new AssignTAAtWindDownTransition())
-        .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_PULL_TA, new PullTAAfterStopTransition())
-        .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, new CompletedAtWindDownTransition())
-        .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOPPING, AMContainerEventType.C_NM_STOP_SENT)
-        .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOPPING, AMContainerEventType.C_NM_STOP_FAILED, new NMStopRequestFailedTransition())
-        .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, new NodeFailedAtNMStopRequestedTransition())
-        .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOP_REQUESTED, EnumSet.of(AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_STOP_REQUEST, AMContainerEventType.C_TIMED_OUT))
-        .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_LAUNCH_REQUEST, new ErrorAtNMStopRequestedTransition())
-
-        .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_ASSIGN_TA, new AssignTAAtWindDownTransition())
-        .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_PULL_TA, new PullTAAfterStopTransition())
-        // TODO This transition is wrong. Should be a noop / error.
-        .addTransition(AMContainerState.STOPPING, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, new CompletedAtWindDownTransition())
-        .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, new NodeFailedBaseTransition())
-        .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, EnumSet.of(AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_STOP_REQUEST, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED, AMContainerEventType.C_TIMED_OUT))
-        .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_LAUNCH_REQUEST, new ErrorAtStoppingTransition())
-
-        .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, AMContainerEventType.C_ASSIGN_TA, new AssignTAAtCompletedTransition())
-        .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, AMContainerEventType.C_PULL_TA, new PullTAAfterStopTransition())
-        .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, AMContainerEventType.C_NODE_FAILED, new NodeFailedBaseTransition())
-        .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_COMPLETED, AMContainerEventType.C_STOP_REQUEST, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED, AMContainerEventType.C_TIMED_OUT))
+      .addTransition(AMContainerState.ALLOCATED, AMContainerState.LAUNCHING,
+          AMContainerEventType.C_LAUNCH_REQUEST, new LaunchRequestTransition())
+      .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED,
+          AMContainerEventType.C_ASSIGN_TA,
+          new AssignTaskAttemptAtAllocatedTransition())
+      .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED,
+          AMContainerEventType.C_COMPLETED,
+          new CompletedAtAllocatedTransition())
+      .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED,
+          AMContainerEventType.C_STOP_REQUEST,
+          new StopRequestAtAllocatedTransition())
+      .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED,
+          AMContainerEventType.C_NODE_FAILED,
+          new NodeFailedAtAllocatedTransition())
+      .addTransition(
+          AMContainerState.ALLOCATED,
+          AMContainerState.COMPLETED,
+          EnumSet.of(AMContainerEventType.C_LAUNCHED,
+              AMContainerEventType.C_LAUNCH_FAILED,
+              AMContainerEventType.C_PULL_TA,
+              AMContainerEventType.C_TA_SUCCEEDED,
+              AMContainerEventType.C_NM_STOP_SENT,
+              AMContainerEventType.C_NM_STOP_FAILED,
+              AMContainerEventType.C_TIMED_OUT), new ErrorTransition())
+
+      .addTransition(
+          AMContainerState.LAUNCHING,
+          EnumSet.of(AMContainerState.LAUNCHING,
+              AMContainerState.STOP_REQUESTED),
+          AMContainerEventType.C_ASSIGN_TA, new AssignTaskAttemptTransition())
+      .addTransition(AMContainerState.LAUNCHING, AMContainerState.IDLE,
+          AMContainerEventType.C_LAUNCHED, new LaunchedTransition())
+      .addTransition(AMContainerState.LAUNCHING, AMContainerState.STOPPING,
+          AMContainerEventType.C_LAUNCH_FAILED, new LaunchFailedTransition())
+      // TODO CREUSE : Maybe, consider sending back an attempt if the container
+      // asks for one in this state. Waiting for a LAUNCHED event from the
+      // NMComm may delay the task allocation.
+      .addTransition(AMContainerState.LAUNCHING, AMContainerState.LAUNCHING,
+          AMContainerEventType.C_PULL_TA)
+      // Is assuming the pullAttempt will be null.
+      .addTransition(AMContainerState.LAUNCHING, AMContainerState.COMPLETED,
+          AMContainerEventType.C_COMPLETED,
+          new CompletedAtLaunchingTransition())
+      .addTransition(AMContainerState.LAUNCHING,
+          AMContainerState.STOP_REQUESTED, AMContainerEventType.C_STOP_REQUEST,
+          new StopRequestAtLaunchingTransition())
+      .addTransition(AMContainerState.LAUNCHING, AMContainerState.STOPPING,
+          AMContainerEventType.C_NODE_FAILED,
+          new NodeFailedAtLaunchingTransition())
+      .addTransition(
+          AMContainerState.LAUNCHING,
+          AMContainerState.STOP_REQUESTED,
+          EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST,
+              AMContainerEventType.C_TA_SUCCEEDED,
+              AMContainerEventType.C_NM_STOP_SENT,
+              AMContainerEventType.C_NM_STOP_FAILED,
+              AMContainerEventType.C_TIMED_OUT),
+          new ErrorAtLaunchingTransition())
+
+      .addTransition(AMContainerState.IDLE,
+          EnumSet.of(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED),
+          AMContainerEventType.C_ASSIGN_TA,
+          new AssignTaskAttemptAtIdleTransition())
+      .addTransition(AMContainerState.IDLE,
+          EnumSet.of(AMContainerState.RUNNING, AMContainerState.IDLE),
+          AMContainerEventType.C_PULL_TA, new PullTAAtIdleTransition())
+      .addTransition(AMContainerState.IDLE, AMContainerState.COMPLETED,
+          AMContainerEventType.C_COMPLETED, new CompletedAtIdleTransition())
+      .addTransition(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED,
+          AMContainerEventType.C_STOP_REQUEST,
+          new StopRequestAtIdleTransition())
+      .addTransition(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED,
+          AMContainerEventType.C_TIMED_OUT, new TimedOutAtIdleTransition())
+      .addTransition(AMContainerState.IDLE, AMContainerState.STOPPING,
+          AMContainerEventType.C_NODE_FAILED, new NodeFailedAtIdleTransition())
+      .addTransition(
+          AMContainerState.IDLE,
+          AMContainerState.STOP_REQUESTED,
+          EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST,
+              AMContainerEventType.C_LAUNCHED,
+              AMContainerEventType.C_LAUNCH_FAILED,
+              AMContainerEventType.C_TA_SUCCEEDED,
+              AMContainerEventType.C_NM_STOP_SENT,
+              AMContainerEventType.C_NM_STOP_FAILED),
+          new ErrorAtIdleTransition())
+
+      .addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED,
+          AMContainerEventType.C_ASSIGN_TA,
+          new AssignTaskAttemptAtRunningTransition())
+      .addTransition(AMContainerState.RUNNING, AMContainerState.RUNNING,
+          AMContainerEventType.C_PULL_TA)
+      .addTransition(AMContainerState.RUNNING, AMContainerState.IDLE,
+          AMContainerEventType.C_TA_SUCCEEDED,
+          new TASucceededAtRunningTransition())
+      .addTransition(AMContainerState.RUNNING, AMContainerState.COMPLETED,
+          AMContainerEventType.C_COMPLETED, new CompletedAtRunningTransition())
+      .addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED,
+          AMContainerEventType.C_STOP_REQUEST,
+          new StopRequestAtRunningTransition())
+      .addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED,
+          AMContainerEventType.C_TIMED_OUT, new TimedOutAtRunningTransition())
+      .addTransition(AMContainerState.RUNNING, AMContainerState.STOPPING,
+          AMContainerEventType.C_NODE_FAILED,
+          new NodeFailedAtRunningTransition())
+      .addTransition(
+          AMContainerState.RUNNING,
+          AMContainerState.STOP_REQUESTED,
+          EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST,
+              AMContainerEventType.C_LAUNCHED,
+              AMContainerEventType.C_LAUNCH_FAILED,
+              AMContainerEventType.C_NM_STOP_SENT,
+              AMContainerEventType.C_NM_STOP_FAILED),
+          new ErrorAtRunningTransition())
+
+      .addTransition(AMContainerState.STOP_REQUESTED,
+          AMContainerState.STOP_REQUESTED, AMContainerEventType.C_ASSIGN_TA,
+          new AssignTAAtWindDownTransition())
+      .addTransition(AMContainerState.STOP_REQUESTED,
+          AMContainerState.STOP_REQUESTED, AMContainerEventType.C_PULL_TA,
+          new PullTAAfterStopTransition())
+      .addTransition(AMContainerState.STOP_REQUESTED,
+          AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED,
+          new CompletedAtWindDownTransition())
+      .addTransition(AMContainerState.STOP_REQUESTED,
+          AMContainerState.STOPPING, AMContainerEventType.C_NM_STOP_SENT)
+      .addTransition(AMContainerState.STOP_REQUESTED,
+          AMContainerState.STOPPING, AMContainerEventType.C_NM_STOP_FAILED,
+          new NMStopRequestFailedTransition())
+      .addTransition(AMContainerState.STOP_REQUESTED,
+          AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED,
+          new NodeFailedAtNMStopRequestedTransition())
+      .addTransition(
+          AMContainerState.STOP_REQUESTED,
+          AMContainerState.STOP_REQUESTED,
+          EnumSet.of(AMContainerEventType.C_LAUNCHED,
+              AMContainerEventType.C_LAUNCH_FAILED,
+              AMContainerEventType.C_TA_SUCCEEDED,
+              AMContainerEventType.C_STOP_REQUEST,
+              AMContainerEventType.C_TIMED_OUT))
+      .addTransition(AMContainerState.STOP_REQUESTED,
+          AMContainerState.STOP_REQUESTED,
+          AMContainerEventType.C_LAUNCH_REQUEST,
+          new ErrorAtNMStopRequestedTransition())
+
+      .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING,
+          AMContainerEventType.C_ASSIGN_TA, new AssignTAAtWindDownTransition())
+      .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING,
+          AMContainerEventType.C_PULL_TA, new PullTAAfterStopTransition())
+      // TODO This transition is wrong. Should be a noop / error.
+      .addTransition(AMContainerState.STOPPING, AMContainerState.COMPLETED,
+          AMContainerEventType.C_COMPLETED, new CompletedAtWindDownTransition())
+      .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING,
+          AMContainerEventType.C_NODE_FAILED, new NodeFailedBaseTransition())
+      .addTransition(
+          AMContainerState.STOPPING,
+          AMContainerState.STOPPING,
+          EnumSet.of(AMContainerEventType.C_LAUNCHED,
+              AMContainerEventType.C_LAUNCH_FAILED,
+              AMContainerEventType.C_TA_SUCCEEDED,
+              AMContainerEventType.C_STOP_REQUEST,
+              AMContainerEventType.C_NM_STOP_SENT,
+              AMContainerEventType.C_NM_STOP_FAILED,
+              AMContainerEventType.C_TIMED_OUT))
+      .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING,
+          AMContainerEventType.C_LAUNCH_REQUEST,
+          new ErrorAtStoppingTransition())
+
+      .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED,
+          AMContainerEventType.C_ASSIGN_TA, new AssignTAAtCompletedTransition())
+      .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED,
+          AMContainerEventType.C_PULL_TA, new PullTAAfterStopTransition())
+      .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED,
+          AMContainerEventType.C_NODE_FAILED, new NodeFailedBaseTransition())
+      .addTransition(
+          AMContainerState.COMPLETED,
+          AMContainerState.COMPLETED,
+          EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST,
+              AMContainerEventType.C_LAUNCHED,
+              AMContainerEventType.C_LAUNCH_FAILED,
+              AMContainerEventType.C_TA_SUCCEEDED,
+              AMContainerEventType.C_COMPLETED,
+              AMContainerEventType.C_STOP_REQUEST,
+              AMContainerEventType.C_NM_STOP_SENT,
+              AMContainerEventType.C_NM_STOP_FAILED,
+              AMContainerEventType.C_TIMED_OUT))
 
         .installTopology();
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/54e23221/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
index a79358b..9ee8a3c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
@@ -74,37 +74,92 @@ public class AMNodeImpl implements AMNode {
   new StateMachineFactory<AMNodeImpl, AMNodeState, AMNodeEventType, AMNodeEvent>(
   AMNodeState.ACTIVE)
         // Transitions from ACTIVE state.
-    .addTransition(AMNodeState.ACTIVE, AMNodeState.ACTIVE, AMNodeEventType.N_CONTAINER_ALLOCATED, new ContainerAllocatedTransition())
-    .addTransition(AMNodeState.ACTIVE, AMNodeState.ACTIVE, AMNodeEventType.N_TA_SUCCEEDED, new TaskAttemptSucceededTransition())
-    .addTransition(AMNodeState.ACTIVE, EnumSet.of(AMNodeState.ACTIVE, AMNodeState.BLACKLISTED), AMNodeEventType.N_TA_ENDED, new TaskAttemptFailedTransition())
-    .addTransition(AMNodeState.ACTIVE, AMNodeState.UNHEALTHY, AMNodeEventType.N_TURNED_UNHEALTHY, new NodeTurnedUnhealthyTransition())
-    .addTransition(AMNodeState.ACTIVE, EnumSet.of(AMNodeState.ACTIVE, AMNodeState.BLACKLISTED), AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED, new IgnoreBlacklistingDisabledTransition())
-    .addTransition(AMNodeState.ACTIVE, AMNodeState.FORCED_ACTIVE, AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED, new IgnoreBlacklistingStateChangeTransition(true))
-    .addTransition(AMNodeState.ACTIVE, AMNodeState.ACTIVE, AMNodeEventType.N_TURNED_HEALTHY)
-
-        // Transitions from BLACKLISTED state.
-    .addTransition(AMNodeState.BLACKLISTED, AMNodeState.BLACKLISTED, AMNodeEventType.N_CONTAINER_ALLOCATED, new ContainerAllocatedWhileBlacklistedTransition())
-    .addTransition(AMNodeState.BLACKLISTED, EnumSet.of(AMNodeState.BLACKLISTED, AMNodeState.ACTIVE), AMNodeEventType.N_TA_SUCCEEDED, new TaskAttemptSucceededWhileBlacklistedTransition())
-    .addTransition(AMNodeState.BLACKLISTED, AMNodeState.BLACKLISTED, AMNodeEventType.N_TA_ENDED, new CountFailedTaskAttemptTransition())
-    .addTransition(AMNodeState.BLACKLISTED, AMNodeState.UNHEALTHY, AMNodeEventType.N_TURNED_UNHEALTHY, new NodeTurnedUnhealthyTransition())
-    .addTransition(AMNodeState.BLACKLISTED, AMNodeState.FORCED_ACTIVE, AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED, new IgnoreBlacklistingStateChangeTransition(true))
-    .addTransition(AMNodeState.BLACKLISTED, AMNodeState.BLACKLISTED, EnumSet.of(AMNodeEventType.N_TURNED_HEALTHY, AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED), new GenericErrorTransition())
-
-        //Transitions from FORCED_ACTIVE state.
-    .addTransition(AMNodeState.FORCED_ACTIVE, AMNodeState.FORCED_ACTIVE, AMNodeEventType.N_CONTAINER_ALLOCATED, new ContainerAllocatedTransition())
-    .addTransition(AMNodeState.FORCED_ACTIVE, AMNodeState.FORCED_ACTIVE, AMNodeEventType.N_TA_SUCCEEDED, new TaskAttemptSucceededTransition())
-    .addTransition(AMNodeState.FORCED_ACTIVE, AMNodeState.FORCED_ACTIVE, AMNodeEventType.N_TA_ENDED, new CountFailedTaskAttemptTransition())
-    .addTransition(AMNodeState.FORCED_ACTIVE, AMNodeState.UNHEALTHY, AMNodeEventType.N_TURNED_UNHEALTHY, new NodeTurnedUnhealthyTransition())
-    .addTransition(AMNodeState.FORCED_ACTIVE, EnumSet.of(AMNodeState.BLACKLISTED, AMNodeState.ACTIVE), AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED, new IgnoreBlacklistingDisabledTransition())
-    .addTransition(AMNodeState.FORCED_ACTIVE, AMNodeState.FORCED_ACTIVE, EnumSet.of(AMNodeEventType.N_TURNED_HEALTHY, AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED), new GenericErrorTransition())
-
-        // Transitions from UNHEALTHY state.
-    .addTransition(AMNodeState.UNHEALTHY, AMNodeState.UNHEALTHY, AMNodeEventType.N_CONTAINER_ALLOCATED, new ContainerAllocatedWhileUnhealthyTransition())
-    .addTransition(AMNodeState.UNHEALTHY, AMNodeState.UNHEALTHY, EnumSet.of(AMNodeEventType.N_TA_SUCCEEDED, AMNodeEventType.N_TA_ENDED))
-    .addTransition(AMNodeState.UNHEALTHY, AMNodeState.UNHEALTHY, AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED, new IgnoreBlacklistingStateChangeTransition(false))
-    .addTransition(AMNodeState.UNHEALTHY,  AMNodeState.UNHEALTHY, AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED, new IgnoreBlacklistingStateChangeTransition(true))
-    .addTransition(AMNodeState.UNHEALTHY, EnumSet.of(AMNodeState.ACTIVE, AMNodeState.FORCED_ACTIVE), AMNodeEventType.N_TURNED_HEALTHY, new NodeTurnedHealthyTransition())
-    .addTransition(AMNodeState.UNHEALTHY, AMNodeState.UNHEALTHY, AMNodeEventType.N_TURNED_UNHEALTHY, new GenericErrorTransition())
+      .addTransition(AMNodeState.ACTIVE, AMNodeState.ACTIVE,
+          AMNodeEventType.N_CONTAINER_ALLOCATED,
+          new ContainerAllocatedTransition())
+      .addTransition(AMNodeState.ACTIVE, AMNodeState.ACTIVE,
+          AMNodeEventType.N_TA_SUCCEEDED, new TaskAttemptSucceededTransition())
+      .addTransition(AMNodeState.ACTIVE,
+          EnumSet.of(AMNodeState.ACTIVE, AMNodeState.BLACKLISTED),
+          AMNodeEventType.N_TA_ENDED, new TaskAttemptFailedTransition())
+      .addTransition(AMNodeState.ACTIVE, AMNodeState.UNHEALTHY,
+          AMNodeEventType.N_TURNED_UNHEALTHY,
+          new NodeTurnedUnhealthyTransition())
+      .addTransition(AMNodeState.ACTIVE,
+          EnumSet.of(AMNodeState.ACTIVE, AMNodeState.BLACKLISTED),
+          AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED,
+          new IgnoreBlacklistingDisabledTransition())
+      .addTransition(AMNodeState.ACTIVE, AMNodeState.FORCED_ACTIVE,
+          AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED,
+          new IgnoreBlacklistingStateChangeTransition(true))
+      .addTransition(AMNodeState.ACTIVE, AMNodeState.ACTIVE,
+          AMNodeEventType.N_TURNED_HEALTHY)
+
+      // Transitions from BLACKLISTED state.
+      .addTransition(AMNodeState.BLACKLISTED, AMNodeState.BLACKLISTED,
+          AMNodeEventType.N_CONTAINER_ALLOCATED,
+          new ContainerAllocatedWhileBlacklistedTransition())
+      .addTransition(AMNodeState.BLACKLISTED,
+          EnumSet.of(AMNodeState.BLACKLISTED, AMNodeState.ACTIVE),
+          AMNodeEventType.N_TA_SUCCEEDED,
+          new TaskAttemptSucceededWhileBlacklistedTransition())
+      .addTransition(AMNodeState.BLACKLISTED, AMNodeState.BLACKLISTED,
+          AMNodeEventType.N_TA_ENDED, new CountFailedTaskAttemptTransition())
+      .addTransition(AMNodeState.BLACKLISTED, AMNodeState.UNHEALTHY,
+          AMNodeEventType.N_TURNED_UNHEALTHY,
+          new NodeTurnedUnhealthyTransition())
+      .addTransition(AMNodeState.BLACKLISTED, AMNodeState.FORCED_ACTIVE,
+          AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED,
+          new IgnoreBlacklistingStateChangeTransition(true))
+      .addTransition(
+          AMNodeState.BLACKLISTED,
+          AMNodeState.BLACKLISTED,
+          EnumSet.of(AMNodeEventType.N_TURNED_HEALTHY,
+              AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED),
+          new GenericErrorTransition())
+
+      // Transitions from FORCED_ACTIVE state.
+      .addTransition(AMNodeState.FORCED_ACTIVE, AMNodeState.FORCED_ACTIVE,
+          AMNodeEventType.N_CONTAINER_ALLOCATED,
+          new ContainerAllocatedTransition())
+      .addTransition(AMNodeState.FORCED_ACTIVE, AMNodeState.FORCED_ACTIVE,
+          AMNodeEventType.N_TA_SUCCEEDED, new TaskAttemptSucceededTransition())
+      .addTransition(AMNodeState.FORCED_ACTIVE, AMNodeState.FORCED_ACTIVE,
+          AMNodeEventType.N_TA_ENDED, new CountFailedTaskAttemptTransition())
+      .addTransition(AMNodeState.FORCED_ACTIVE, AMNodeState.UNHEALTHY,
+          AMNodeEventType.N_TURNED_UNHEALTHY,
+          new NodeTurnedUnhealthyTransition())
+      .addTransition(AMNodeState.FORCED_ACTIVE,
+          EnumSet.of(AMNodeState.BLACKLISTED, AMNodeState.ACTIVE),
+          AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED,
+          new IgnoreBlacklistingDisabledTransition())
+      .addTransition(
+          AMNodeState.FORCED_ACTIVE,
+          AMNodeState.FORCED_ACTIVE,
+          EnumSet.of(AMNodeEventType.N_TURNED_HEALTHY,
+              AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED),
+          new GenericErrorTransition())
+
+      // Transitions from UNHEALTHY state.
+      .addTransition(AMNodeState.UNHEALTHY, AMNodeState.UNHEALTHY,
+          AMNodeEventType.N_CONTAINER_ALLOCATED,
+          new ContainerAllocatedWhileUnhealthyTransition())
+      .addTransition(
+          AMNodeState.UNHEALTHY,
+          AMNodeState.UNHEALTHY,
+          EnumSet
+              .of(AMNodeEventType.N_TA_SUCCEEDED, AMNodeEventType.N_TA_ENDED))
+      .addTransition(AMNodeState.UNHEALTHY, AMNodeState.UNHEALTHY,
+          AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED,
+          new IgnoreBlacklistingStateChangeTransition(false))
+      .addTransition(AMNodeState.UNHEALTHY, AMNodeState.UNHEALTHY,
+          AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED,
+          new IgnoreBlacklistingStateChangeTransition(true))
+      .addTransition(AMNodeState.UNHEALTHY,
+          EnumSet.of(AMNodeState.ACTIVE, AMNodeState.FORCED_ACTIVE),
+          AMNodeEventType.N_TURNED_HEALTHY, new NodeTurnedHealthyTransition())
+      .addTransition(AMNodeState.UNHEALTHY, AMNodeState.UNHEALTHY,
+          AMNodeEventType.N_TURNED_UNHEALTHY, new GenericErrorTransition())
 
         .installTopology();