You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/04/19 01:54:28 UTC

svn commit: r1469642 [18/36] - in /incubator/tez/branches/TEZ-1: ./ example_jobs/ example_jobs/sampleInput/ example_jobs/wc_mr_6m_1r/ example_jobs/wc_mrr_6m_3r_3r/ ljr_helper/ tez-common/ tez-common/src/ tez-common/src/main/ tez-common/src/main/java/ t...

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventType.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventType.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventType.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventType.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,56 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.rm.container;
+
+public enum AMContainerEventType {
+
+  //Producer: Scheduler
+  C_LAUNCH_REQUEST,
+  C_ASSIGN_TA,
+  
+  //Producer: NMCommunicator
+  C_LAUNCHED,
+  C_LAUNCH_FAILED,
+
+  //Producer: TAL: PULL_TA is a sync call.
+  C_PULL_TA,
+
+  //Producer: Scheduler via TA
+  C_TA_SUCCEEDED, // maybe change this to C_TA_FINISHED with a status.
+
+  //Producer: RMCommunicator
+  C_COMPLETED,
+  
+  //Producer: RMCommunicator, AMNode
+  C_NODE_FAILED,
+  
+  //TODO ZZZ CREUSE: Consider introducing a new event C_NODE_BLACKLISTED -> container can take a call on what to do if this event comes in.
+  
+  //Producer: TA-> Scheduler -> Container (in case of failure etc)
+  //          Scheduler -> Container (in case of pre-emption etc)
+  //          Node -> Container (in case of Node blacklisted etc)
+  C_STOP_REQUEST,
+  
+  //Producer: NMCommunicator
+  C_NM_STOP_FAILED,
+  C_NM_STOP_SENT,
+  
+  //Producer: ContainerHeartbeatHandler
+  C_TIMED_OUT,
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventType.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,205 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.rm.container;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceChildJVM2;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.engine.common.security.JobTokenIdentifier;
+import org.apache.tez.engine.common.shuffle.server.ShuffleHandler;
+import org.apache.tez.engine.records.TezVertexID;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class AMContainerHelpers {
+
+  private static final Log LOG = LogFactory.getLog(AMContainerHelpers.class);
+
+  private static Object commonContainerSpecLock = new Object();
+  private static ContainerLaunchContext commonContainerSpec = null;
+
+  /**
+   * Create a {@link LocalResource} record with all the given parameters.
+   */
+  public static LocalResource createLocalResource(FileSystem fc, Path file,
+      LocalResourceType type, LocalResourceVisibility visibility)
+      throws IOException {
+    FileStatus fstat = fc.getFileStatus(file);
+    URL resourceURL = ConverterUtils.getYarnUrlFromPath(fc.resolvePath(fstat
+        .getPath()));
+    long resourceSize = fstat.getLen();
+    long resourceModificationTime = fstat.getModificationTime();
+
+    return BuilderUtils.newLocalResource(resourceURL, type, visibility,
+        resourceSize, resourceModificationTime);
+  }
+  
+  /**
+   * Create the common {@link ContainerLaunchContext} for all attempts.
+   * 
+   * @param applicationACLs
+   */
+  private static ContainerLaunchContext createCommonContainerLaunchContext(
+      Map<ApplicationAccessType, String> applicationACLs, Configuration conf,
+      Token<JobTokenIdentifier> jobToken,
+      ApplicationId appId, TezVertexID vertexId, Credentials credentials) {
+
+    // Application resources
+    Map<String, LocalResource> localResources =
+        new HashMap<String, LocalResource>();
+
+    // Application environment
+    Map<String, String> environment = new HashMap<String, String>();
+
+    // Service data
+    Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
+
+    // Tokens
+    ByteBuffer taskCredentialsBuffer = ByteBuffer.wrap(new byte[] {});
+    try {
+      // Setup up task credentials buffer
+      Credentials taskCredentials = new Credentials();
+
+      if (UserGroupInformation.isSecurityEnabled()) {
+        LOG.info("Adding #" + credentials.numberOfTokens() + " tokens and #"
+            + credentials.numberOfSecretKeys()
+            + " secret keys for NM use for launching container");
+        taskCredentials.addAll(credentials);
+      }
+
+      // LocalStorageToken is needed irrespective of whether security is enabled
+      // or not.
+      TokenCache.setJobToken(jobToken, taskCredentials);
+
+      DataOutputBuffer containerTokens_dob = new DataOutputBuffer();
+      LOG.info("Size of containertokens_dob is "
+          + taskCredentials.numberOfTokens());
+      taskCredentials.writeTokenStorageToStream(containerTokens_dob);
+      taskCredentialsBuffer = ByteBuffer.wrap(containerTokens_dob.getData(), 0,
+          containerTokens_dob.getLength());
+
+      // Add shuffle token
+      LOG.info("Putting shuffle token in serviceData");
+      serviceData.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
+          ShuffleHandler.serializeServiceData(jobToken));
+
+    } catch (IOException e) {
+      throw new YarnException(e);
+    }
+
+    // Construct the actual Container
+    // The null fields are per-container and will be constructed for each
+    // container separately.
+    ContainerLaunchContext container = BuilderUtils.newContainerLaunchContext(
+        conf.get(MRJobConfig.USER_NAME), localResources,
+        environment, null, serviceData, taskCredentialsBuffer, applicationACLs);
+
+    return container;
+  }
+
+  // FIXME does CLC need to work based off DAG id or App Id?
+  @VisibleForTesting
+  public static ContainerLaunchContext createContainerLaunchContext(
+      Map<ApplicationAccessType, String> applicationACLs,
+      ContainerId containerId, JobConf jobConf, TezVertexID vertexId,
+      Token<JobTokenIdentifier> jobToken,
+      Resource assignedCapability, Map<String, LocalResource> localResources,
+      Map<String, String> vertexEnv,
+      TaskAttemptListener taskAttemptListener, Credentials credentials,
+      boolean shouldProfile) {
+
+    synchronized (commonContainerSpecLock) {
+      if (commonContainerSpec == null) {
+        commonContainerSpec = createCommonContainerLaunchContext(
+            applicationACLs, jobConf, jobToken,
+            vertexId.getDAGId().getApplicationId(),
+            vertexId, credentials);
+      }
+    }
+
+    // Fill in the fields needed per-container that are missing in the common
+    // spec.
+    Map<String, LocalResource> lResources =
+        new TreeMap<String, LocalResource>();
+    lResources.putAll(commonContainerSpec.getLocalResources());
+    lResources.putAll(localResources);
+
+    // Setup environment by cloning from common env.
+    // FIXME common env is empty
+    // MRChildJVM2.setEnv should become a no-op
+    Map<String, String> env = commonContainerSpec.getEnvironment();
+    Map<String, String> myEnv = new HashMap<String, String>(env.size());
+    myEnv.putAll(env);
+    myEnv.putAll(vertexEnv);
+    MapReduceChildJVM2.setVMEnv(myEnv, jobConf, vertexId);
+
+    // Set up the launch command
+    List<String> commands = MapReduceChildJVM2.getVMCommand(
+        taskAttemptListener.getAddress(), jobConf, vertexId, containerId,
+        vertexId.getDAGId().getApplicationId(), shouldProfile);
+
+    // Duplicate the ByteBuffers for access by multiple containers.
+    Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>();
+    for (Entry<String, ByteBuffer> entry : commonContainerSpec.getServiceData()
+        .entrySet()) {
+      myServiceData.put(entry.getKey(), entry.getValue().duplicate());
+    }
+
+    // Construct the actual Container
+    ContainerLaunchContext container = BuilderUtils.newContainerLaunchContext(
+        commonContainerSpec.getUser(), lResources, myEnv, commands,
+        myServiceData, commonContainerSpec.getContainerTokens().duplicate(),
+        applicationACLs);
+
+    return container;
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,964 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.rm.container;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
+import org.apache.hadoop.yarn.state.MultipleArcTransition;
+import org.apache.hadoop.yarn.state.SingleArcTransition;
+import org.apache.hadoop.yarn.state.StateMachine;
+import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.apache.tez.common.TezTask;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.ContainerHeartbeatHandler;
+import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.dag.event.DiagnosableEvent;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminating;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventDiagnosticsUpdate;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventNodeFailed;
+import org.apache.tez.dag.app.rm.AMSchedulerEventContainerCompleted;
+import org.apache.tez.dag.app.rm.AMSchedulerEventDeallocateContainer;
+import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
+import org.apache.tez.dag.app.rm.NMCommunicatorStopRequestEvent;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+
+@SuppressWarnings("rawtypes")
+public class AMContainerImpl implements AMContainer {
+
+  private static final Log LOG = LogFactory.getLog(AMContainerImpl.class);
+  
+  private final ReadLock readLock;
+  private final WriteLock writeLock;
+  private final ContainerId containerId;
+  // Container to be used for getters on capability, locality etc.
+  private final Container container;
+  private final AppContext appContext;
+  private final ContainerHeartbeatHandler containerHeartbeatHandler;
+  private final TaskAttemptListener taskAttemptListener;
+  protected final EventHandler eventHandler;
+
+  private final List<TezTaskAttemptID> completedAttempts = new LinkedList<TezTaskAttemptID>();
+
+  // TODO Maybe this should be pulled from the TaskAttempt.s
+  private final Map<TezTaskAttemptID, TezTask> remoteTaskMap =
+      new HashMap<TezTaskAttemptID, TezTask>();
+  
+  // TODO ?? Convert to list and hash.
+  
+  private int shufflePort; 
+  private long idleTimeBetweenTasks = 0;
+  private long lastTaskFinishTime;
+
+  // An assign can happen even during wind down. e.g. NodeFailure caused the
+  // wind down, and an allocation was pending in the AMScheduler. This could
+  // be modelled as a separate state.
+  private boolean nodeFailed = false;
+  private String nodeFailedMessage;
+
+  private TezTaskAttemptID pendingAttempt;
+  private TezTaskAttemptID runningAttempt;
+  private List<TezTaskAttemptID> failedAssignments;
+  private TezTaskAttemptID pullAttempt;
+  
+  private AMContainerTask noAllocationContainerTask;
+  
+  private static final AMContainerTask NO_MORE_TASKS = new AMContainerTask(
+      true, null);
+  private static final AMContainerTask WAIT_TASK = new AMContainerTask(false,
+      null);
+  
+  private boolean inError = false;
+  
+  private ContainerLaunchContext clc;
+
+  // TODO Consider registering with the TAL, instead of the TAL pulling.
+  // Possibly after splitting TAL and ContainerListener.
+
+  // TODO What should be done with pendingAttempts. Nullify when handled ?
+  // Add them to failed ta list ? Some historic information should be maintained.
+
+  // TODO Create a generic ERROR state. Container tries informing relevant components in this case.
+
+
+  private final StateMachine<AMContainerState, AMContainerEventType, AMContainerEvent> stateMachine;
+  private static final StateMachineFactory
+      <AMContainerImpl, AMContainerState, AMContainerEventType, AMContainerEvent> 
+      stateMachineFactory = 
+      new StateMachineFactory<AMContainerImpl, AMContainerState, AMContainerEventType, AMContainerEvent>(
+      AMContainerState.ALLOCATED)
+
+        .addTransition(AMContainerState.ALLOCATED, AMContainerState.LAUNCHING, AMContainerEventType.C_LAUNCH_REQUEST, new LaunchRequestTransition())
+        .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, AMContainerEventType.C_ASSIGN_TA, new AssignTaskAttemptAtAllocatedTransition())
+        .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, new CompletedAtAllocatedTransition())
+        .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, AMContainerEventType.C_STOP_REQUEST, new StopRequestAtAllocatedTransition())
+        .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, AMContainerEventType.C_NODE_FAILED, new NodeFailedAtAllocatedTransition())
+        .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, EnumSet.of(AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_PULL_TA, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED, AMContainerEventType.C_TIMED_OUT), new ErrorTransition())
+
+        .addTransition(AMContainerState.LAUNCHING, EnumSet.of(AMContainerState.LAUNCHING, AMContainerState.STOP_REQUESTED), AMContainerEventType.C_ASSIGN_TA, new AssignTaskAttemptTransition())
+        .addTransition(AMContainerState.LAUNCHING, AMContainerState.IDLE, AMContainerEventType.C_LAUNCHED, new LaunchedTransition())
+        .addTransition(AMContainerState.LAUNCHING, AMContainerState.STOPPING, AMContainerEventType.C_LAUNCH_FAILED, new LaunchFailedTransition())
+        // TODO CREUSE : Maybe, consider sending back an attempt if the container asks for one in this state. Waiting for a LAUNCHED event from the NMComm may delay the task allocation.
+        .addTransition(AMContainerState.LAUNCHING, AMContainerState.LAUNCHING, AMContainerEventType.C_PULL_TA) // Is assuming the pullAttempt will be null.
+        .addTransition(AMContainerState.LAUNCHING, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, new CompletedAtLaunchingTransition())
+        .addTransition(AMContainerState.LAUNCHING, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_STOP_REQUEST, new StopRequestAtLaunchingTransition())
+        .addTransition(AMContainerState.LAUNCHING, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, new NodeFailedAtLaunchingTransition())
+        .addTransition(AMContainerState.LAUNCHING, AMContainerState.STOP_REQUESTED, EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED, AMContainerEventType.C_TIMED_OUT), new ErrorAtLaunchingTransition())
+
+        .addTransition(AMContainerState.IDLE, EnumSet.of(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED), AMContainerEventType.C_ASSIGN_TA, new AssignTaskAttemptAtIdleTransition())
+        .addTransition(AMContainerState.IDLE, EnumSet.of(AMContainerState.RUNNING, AMContainerState.IDLE), AMContainerEventType.C_PULL_TA, new PullTAAtIdleTransition())
+        .addTransition(AMContainerState.IDLE, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, new CompletedAtIdleTransition())
+        .addTransition(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_STOP_REQUEST, new StopRequestAtIdleTransition())
+        .addTransition(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_TIMED_OUT, new TimedOutAtIdleTransition())
+        .addTransition(AMContainerState.IDLE, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, new NodeFailedAtIdleTransition())
+        .addTransition(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED, EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED), new ErrorAtIdleTransition())
+        
+        .addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_ASSIGN_TA, new AssignTaskAttemptAtRunningTransition())
+        .addTransition(AMContainerState.RUNNING, AMContainerState.RUNNING, AMContainerEventType.C_PULL_TA)
+        .addTransition(AMContainerState.RUNNING, AMContainerState.IDLE, AMContainerEventType.C_TA_SUCCEEDED, new TASucceededAtRunningTransition())
+        .addTransition(AMContainerState.RUNNING, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, new CompletedAtRunningTransition())
+        .addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_STOP_REQUEST, new StopRequestAtRunningTransition())
+        .addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_TIMED_OUT, new TimedOutAtRunningTransition())
+        .addTransition(AMContainerState.RUNNING, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, new NodeFailedAtRunningTransition())
+        .addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED, EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED), new ErrorAtRunningTransition())
+        
+        .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_ASSIGN_TA, new AssignTAAtWindDownTransition())
+        .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_PULL_TA, new PullTAAfterStopTransition())
+        .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, new CompletedAtWindDownTransition())
+        .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOPPING, AMContainerEventType.C_NM_STOP_SENT)
+        .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOPPING, AMContainerEventType.C_NM_STOP_FAILED, new NMStopRequestFailedTransition())
+        .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, new NodeFailedAtNMStopRequestedTransition())
+        .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOP_REQUESTED, EnumSet.of(AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_STOP_REQUEST, AMContainerEventType.C_TIMED_OUT))
+        .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_LAUNCH_REQUEST, new ErrorAtNMStopRequestedTransition())
+        
+        .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_ASSIGN_TA, new AssignTAAtWindDownTransition())
+        .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_PULL_TA, new PullTAAfterStopTransition())
+        .addTransition(AMContainerState.STOPPING, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, new CompletedAtWindDownTransition())
+        .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, new NodeFailedBaseTransition())
+        .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, EnumSet.of(AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_STOP_REQUEST, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED, AMContainerEventType.C_TIMED_OUT))
+        .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_LAUNCH_REQUEST, new ErrorAtStoppingTransition())
+        
+        .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, AMContainerEventType.C_ASSIGN_TA, new AssignTAAtCompletedTransition())
+        .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, AMContainerEventType.C_PULL_TA, new PullTAAfterStopTransition())
+        .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, AMContainerEventType.C_NODE_FAILED, new NodeFailedBaseTransition())
+        .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_COMPLETED, AMContainerEventType.C_STOP_REQUEST, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED, AMContainerEventType.C_TIMED_OUT))
+
+        .installTopology();
+
+  // Note: Containers will not reach their final state if the RM link is broken,
+  // AM shutdown should not wait for this.
+
+  // Attempting to use a container based purely on reosurces required, etc needs
+  // additional change - JvmID, YarnChild, etc depend on TaskType.
+  public AMContainerImpl(Container container, ContainerHeartbeatHandler chh,
+      TaskAttemptListener tal, AppContext appContext) {
+    ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
+    this.readLock = rwLock.readLock();
+    this.writeLock = rwLock.writeLock();
+    this.container = container;
+    this.containerId = container.getId();
+    this.eventHandler = appContext.getEventHandler();
+    this.appContext = appContext;
+    this.containerHeartbeatHandler = chh;
+    this.taskAttemptListener = tal;
+    this.failedAssignments = new LinkedList<TezTaskAttemptID>();
+
+    this.noAllocationContainerTask = WAIT_TASK; 
+    this.stateMachine = stateMachineFactory.make(this);
+  }
+  
+  @Override
+  public AMContainerState getState() {
+    readLock.lock();
+    try {
+      return stateMachine.getCurrentState();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public ContainerId getContainerId() {
+    return this.containerId;
+  }
+  
+  @Override
+  public Container getContainer() {
+    return this.container;
+  }
+
+  @Override
+  public List<TezTaskAttemptID> getCompletedTaskAttempts() {
+    readLock.lock();
+    try {
+      return new ArrayList<TezTaskAttemptID>(this.completedAttempts);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public List<TezTaskAttemptID> getQueuedTaskAttempts() {
+    readLock.lock();
+    try {
+      return Collections.singletonList(this.pendingAttempt);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public TezTaskAttemptID getRunningTaskAttempt() {
+    readLock.lock();
+    try {
+      return this.runningAttempt;
+    } finally {
+      readLock.unlock();
+    }
+  }
+  
+  @Override
+  public int getShufflePort() {
+    readLock.lock();
+    try {
+      return this.shufflePort;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public void handle(AMContainerEvent event) {
+    this.writeLock.lock();
+    LOG.info("DEBUG: Processing AMContainerEvent " + event.getContainerId()
+        + " of type " + event.getType() + " while in state: " + getState()
+        + ". Event: " + event);
+    try {
+      final AMContainerState oldState = getState();
+      try {
+        stateMachine.doTransition(event.getType(), event);
+      } catch (InvalidStateTransitonException e) {
+        LOG.error("Can't handle event " + event.getType()
+            + " at current state " + oldState + " for ContainerId "
+            + this.containerId, e);
+        inError = true;
+        // TODO Can't set state to COMPLETED. Add a default error state.
+      }
+      if (oldState != getState()) {
+        LOG.info("AMContainer " + this.containerId + " transitioned from "
+            + oldState + " to " + getState());
+      }
+    } finally {
+      writeLock.unlock();
+    }
+  }
+  
+  @SuppressWarnings("unchecked")
+  private void sendEvent(Event<?> event) {
+    this.eventHandler.handle(event);
+  }
+
+  // Push the TaskAttempt to the TAL, instead of the TAL pulling when a JVM asks
+  // for a TaskAttempt.
+  public AMContainerTask pullTaskContext() {
+    this.writeLock.lock();
+    try {
+      this.handle(
+          new AMContainerEvent(containerId, AMContainerEventType.C_PULL_TA));
+      if (pullAttempt == null) {
+        return noAllocationContainerTask;
+      } else {
+        return new AMContainerTask(false, remoteTaskMap.remove(pullAttempt));
+      }
+    } finally {
+      this.pullAttempt = null;
+      this.writeLock.unlock();
+    }
+  }
+
+  //////////////////////////////////////////////////////////////////////////////
+  //                   Start of Transition Classes                            //
+  //////////////////////////////////////////////////////////////////////////////
+
+  protected static class LaunchRequestTransition implements
+      SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      AMContainerEventLaunchRequest event = (AMContainerEventLaunchRequest) cEvent;
+      
+      JobConf jobConf = new JobConf(event.getConf());
+      
+      container.clc = AMContainerHelpers.createContainerLaunchContext(
+          container.appContext.getApplicationACLs(),
+          container.getContainerId(), jobConf,
+          event.getVertexId(),
+          event.getJobToken(),
+          container.getContainer().getResource(),
+          event.getLocalResources(),
+          event.getEnvironment(),
+          container.taskAttemptListener, event.getCredentials(),
+          event.shouldProfile());
+
+      container.registerWithTAListener();
+      container.sendStartRequestToNM();
+      LOG.info("Sending Launch Request for Container with id: " +
+          container.container.getId());
+      // Forget about the clc to save resources. At some point, part of the clc
+      // info may need to be exposed to the scheduler to figure out whether a 
+      // container can be used for a specific TaskAttempt.
+      container.clc = null;
+    }
+  }
+
+  protected static class AssignTaskAttemptAtAllocatedTransition implements
+      SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      AMContainerEventAssignTA event = (AMContainerEventAssignTA) cEvent;
+      container.inError = true;
+      container.maybeSendNodeFailureForFailedAssignment(event
+          .getTaskAttemptId());
+      container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(),
+          "AMScheduler Error: TaskAttempt allocated to unlaunched container: " +
+              container.getContainerId());
+      container.sendCompletedToScheduler();
+      container.deAllocate();
+      LOG.warn("Unexpected TA Assignment: TAId: " + event.getTaskAttemptId() +
+          "  for ContainerId: " + container.getContainerId() +
+          " while in state: " + container.getState());
+    }
+  }
+
+  protected static class CompletedAtAllocatedTransition implements
+      SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      AMContainerEventCompleted event = (AMContainerEventCompleted)cEvent;
+      container.sendCompletedToScheduler();
+      container.sendDiagUpdateOnContainerComplete(event);
+      String diag = event.getContainerStatus().getDiagnostics();
+      if (!(diag == null || diag.equals(""))) {
+        LOG.info("Container " + container.getContainerId()
+            + " exited with diagnostics set to " + diag);
+      }
+    }
+  }
+
+  protected static class StopRequestAtAllocatedTransition implements
+      SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      container.nodeFailed = true;
+      if (cEvent instanceof DiagnosableEvent) {
+        container.nodeFailedMessage = ((DiagnosableEvent) cEvent)
+            .getDiagnosticInfo();
+      }
+      // TODO why are these sent. no need to send these now.
+      container.sendCompletedToScheduler();
+      container.deAllocate();
+    }
+  }
+
+  protected static class NodeFailedAtAllocatedTransition implements
+      SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      container.sendCompletedToScheduler();
+      container.deAllocate();
+    }
+  }
+
+  protected static class ErrorTransition extends ErrorBaseTransition {
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      container.sendCompletedToScheduler();
+      container.deAllocate();
+      LOG.info(
+          "Unexpected event type: " + cEvent.getType() + " while in state: " +
+              container.getState() + ". Event: " + cEvent);
+
+    }
+  }
+
+  protected static class AssignTaskAttemptTransition implements
+      MultipleArcTransition<AMContainerImpl, AMContainerEvent, AMContainerState> {
+
+    @Override
+    public AMContainerState transition(
+        AMContainerImpl container, AMContainerEvent cEvent) {
+      AMContainerEventAssignTA event = (AMContainerEventAssignTA) cEvent;
+      if (container.pendingAttempt != null) {
+        // This may include a couple of additional (harmless) unregister calls
+        // to the taskAttemptListener and containerHeartbeatHandler - in case
+        // of assign at any state prior to IDLE.
+        container.handleExtraTAAssign(event, container.pendingAttempt);
+        // TODO XXX: Verify that it's ok to send in a NM_STOP_REQUEST. The
+        // NMCommunicator should be able to handle this. The STOP_REQUEST would
+        // only go out after the START_REQUEST.
+        return AMContainerState.STOP_REQUESTED;
+      }
+      container.pendingAttempt = event.getTaskAttemptId();
+      LOG.info("DEBUG: AssignTA: attempt: " + event.getRemoteTaskContext());
+      container.remoteTaskMap
+          .put(event.getTaskAttemptId(), event.getRemoteTaskContext());
+      return container.getState();
+    }
+  }
+
+  protected static class LaunchedTransition implements
+      SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      AMContainerEventLaunched event = (AMContainerEventLaunched) cEvent;
+      container.shufflePort = event.getShufflePort();
+      container.registerWithContainerListener();
+    }
+  }
+
+  protected static class LaunchFailedTransition implements
+      SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      if (container.pendingAttempt != null) {
+        AMContainerEventLaunchFailed event = (AMContainerEventLaunchFailed) cEvent;
+        container.sendTerminatingToTA(container.pendingAttempt,
+            event.getMessage());
+      }
+      container.unregisterFromTAListener();
+      container.deAllocate();
+    }
+  }
+
+  protected static class CompletedAtLaunchingTransition
+      implements SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      AMContainerEventCompleted event = (AMContainerEventCompleted) cEvent;
+      if (container.pendingAttempt != null) {
+        String errorMessage = getMessage(container, event);
+        container.sendTerminatedToTaskAttempt(container.pendingAttempt,
+            errorMessage);
+        LOG.warn(errorMessage);
+      }
+      container.sendDiagUpdateOnContainerComplete(event);
+      container.unregisterFromTAListener();
+      container.sendCompletedToScheduler();
+      String diag = event.getContainerStatus().getDiagnostics();
+      if (!(diag == null || diag.equals(""))) {
+        LOG.info("Container " + container.getContainerId()
+            + " exited with diagnostics set to " + diag);
+      }
+    }
+
+    public String getMessage(AMContainerImpl container,
+        AMContainerEventCompleted event) {
+      return "Container" + container.getContainerId()
+          + " COMPLETED while trying to launch. Diagnostics: ["
+          + event.getContainerStatus().getDiagnostics() +"]";
+    }
+  }
+
+  protected static class StopRequestAtLaunchingTransition
+      implements SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      if (container.pendingAttempt != null) {
+        container.sendTerminatingToTA(container.pendingAttempt,
+            getMessage(container, cEvent));
+      }
+      container.unregisterFromTAListener();
+      container.sendStopRequestToNM();
+    }
+
+    public String getMessage(
+        AMContainerImpl container, AMContainerEvent event) {
+      return "Container " + container.getContainerId() +
+          " received a STOP_REQUEST";
+    }
+  }
+
+  protected static class NodeFailedBaseTransition
+      implements SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+
+      container.nodeFailed = true;
+      String errorMessage = null;
+      if (cEvent instanceof DiagnosableEvent) {
+        errorMessage = ((DiagnosableEvent) cEvent).getDiagnosticInfo();
+      }
+
+      for (TezTaskAttemptID taId : container.failedAssignments) {
+        container.sendNodeFailureToTA(taId, errorMessage);
+      }
+      for (TezTaskAttemptID taId : container.completedAttempts) {
+        // TODO XXX: Make sure TaskAttempt knows how to handle kills to REDUCEs.
+        container.sendNodeFailureToTA(taId, errorMessage);
+      }
+
+      if (container.pendingAttempt != null) {
+        container.sendNodeFailureToTA(container.pendingAttempt, errorMessage);
+        container.sendTerminatingToTA(container.pendingAttempt, "Node failure");
+      }
+      if (container.runningAttempt != null) {
+        container.sendNodeFailureToTA(container.runningAttempt, errorMessage);
+        container.sendTerminatingToTA(container.runningAttempt, "Node failure");
+      }
+    }
+  }
+
+  protected static class NodeFailedAtLaunchingTransition
+      extends NodeFailedBaseTransition {
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      super.transition(container, cEvent);
+      container.unregisterFromTAListener();
+      container.deAllocate();
+    }
+  }
+
+  protected static class ErrorAtLaunchingTransition
+      extends ErrorTransition {
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      super.transition(container, cEvent);
+      if (container.pendingAttempt != null) {
+        container.sendTerminatedToTaskAttempt(container.pendingAttempt,
+            "Container " + container.getContainerId() +
+                " hit an invalid transition - " + cEvent.getType() + " at " +
+                container.getState());
+      }
+      container.unregisterFromTAListener();
+    }
+  }
+
+  protected static class AssignTaskAttemptAtIdleTransition
+      extends AssignTaskAttemptTransition {
+    @Override
+    public AMContainerState transition(
+        AMContainerImpl container, AMContainerEvent cEvent) {
+      LOG.info("DEBUG: AssignTAAtIdle: attempt: " +
+          ((AMContainerEventAssignTA) cEvent).getRemoteTaskContext());
+      return super.transition(container, cEvent);
+    }
+  }
+
+  protected static class PullTAAtIdleTransition implements
+      MultipleArcTransition<AMContainerImpl, AMContainerEvent, AMContainerState> {
+
+    @Override
+    public AMContainerState transition(
+        AMContainerImpl container, AMContainerEvent cEvent) {
+      if (container.pendingAttempt != null) {
+        // This will be invoked as part of the PULL_REQUEST - so pullAttempt pullAttempt
+        // should ideally only end up being populated during the duration of this call,
+        // which is in a write lock. pullRequest() should move this to the running state.
+        container.pullAttempt = container.pendingAttempt;
+        container.runningAttempt = container.pendingAttempt;
+        container.pendingAttempt = null;
+        if (container.lastTaskFinishTime != 0) {
+          long idleTimeDiff =
+              System.currentTimeMillis() - container.lastTaskFinishTime;
+          LOG.info("DEBUG: Computing idle time for container: " +
+              container.getContainerId() + ", lastFinishTime: " +
+              container.lastTaskFinishTime + ", Incremented by: " +
+              idleTimeDiff);
+          container.idleTimeBetweenTasks +=
+              System.currentTimeMillis() - container.lastTaskFinishTime;
+        }
+        LOG.info("Assigned taskAttempt + [" + container.runningAttempt +
+            "] to container: [" + container.getContainerId() + "]");
+        return AMContainerState.RUNNING;
+      } else {
+        return AMContainerState.IDLE;
+      }
+    }
+  }
+
+  protected static class CompletedAtIdleTransition
+      extends CompletedAtLaunchingTransition {
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      super.transition(container, cEvent);
+      container.unregisterFromContainerListener();
+      container.sendDiagUpdateOnContainerComplete((AMContainerEventCompleted)cEvent);
+    }
+
+    @Override
+    public String getMessage(
+        AMContainerImpl container, AMContainerEventCompleted event) {
+      return "Container " + container.getContainerId() + " COMPLETED"
+          + " with diagnostics set to ["
+          + event.getContainerStatus().getDiagnostics() + "]";
+    }
+  }
+
+  protected static class StopRequestAtIdleTransition
+      extends StopRequestAtLaunchingTransition {
+
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      super.transition(container, cEvent);
+      LOG.info("DEBUG: IdleTimeBetweenTasks: " + container.idleTimeBetweenTasks);
+      container.unregisterFromContainerListener();
+    }
+  }
+
+  protected static class TimedOutAtIdleTransition
+      extends StopRequestAtIdleTransition {
+
+    public String getMessage(
+        AMContainerImpl container, AMContainerEvent event) {
+      return "Container " + container.getContainerId() +
+          " timed out";
+    }
+  }
+
+  protected static class NodeFailedAtIdleTransition
+      extends NodeFailedAtLaunchingTransition {
+
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      super.transition(container, cEvent);
+      container.unregisterFromContainerListener();
+    }
+  }
+
+  protected static class ErrorAtIdleTransition
+      extends ErrorAtLaunchingTransition {
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      super.transition(container, cEvent);
+      container.unregisterFromContainerListener();
+    }
+  }
+
+  protected static class AssignTaskAttemptAtRunningTransition implements
+      SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+
+      AMContainerEventAssignTA event = (AMContainerEventAssignTA) cEvent;
+      container.unregisterAttemptFromListener(container.runningAttempt);
+      container.handleExtraTAAssign(event, container.runningAttempt);
+    }
+  }
+
+  protected static class TASucceededAtRunningTransition
+      implements SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      container.lastTaskFinishTime = System.currentTimeMillis();
+      container.completedAttempts.add(container.runningAttempt);
+      container.unregisterAttemptFromListener(container.runningAttempt);
+      container.runningAttempt = null;
+    }
+  }
+
+  protected static class CompletedAtRunningTransition
+      extends CompletedAtIdleTransition {
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      AMContainerEventCompleted event = (AMContainerEventCompleted) cEvent;
+      container.sendTerminatedToTaskAttempt(container.runningAttempt,
+          getMessage(container, event));
+      container.unregisterAttemptFromListener(container.runningAttempt);
+      super.transition(container, cEvent);
+    }
+  }
+
+  protected static class StopRequestAtRunningTransition
+      extends StopRequestAtIdleTransition {
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+
+      container.unregisterAttemptFromListener(container.runningAttempt);
+      container.sendTerminatingToTA(container.runningAttempt,
+          " Container" + container.getContainerId() +
+              " received a STOP_REQUEST");
+      super.transition(container, cEvent);
+    }
+  }
+
+  protected static class TimedOutAtRunningTransition
+      extends StopRequestAtRunningTransition {
+    @Override
+    public String getMessage(
+        AMContainerImpl container, AMContainerEvent event) {
+      return "Container " + container.getContainerId() +
+          " timed out";
+    }
+  }
+
+  protected static class NodeFailedAtRunningTransition
+      extends NodeFailedAtIdleTransition {
+
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      super.transition(container, cEvent);
+      container.unregisterAttemptFromListener(container.runningAttempt);
+    }
+  }
+
+  protected static class ErrorAtRunningTransition
+      extends ErrorAtIdleTransition {
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      super.transition(container, cEvent);
+      container.unregisterAttemptFromListener(container.runningAttempt);
+      container.sendTerminatedToTaskAttempt(container.runningAttempt,
+          "Container " + container.getContainerId() +
+              " hit an invalid transition - " + cEvent.getType() + " at " +
+              container.getState());
+    }
+  }
+
+  protected static class AssignTAAtWindDownTransition
+      implements SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      AMContainerEventAssignTA event = (AMContainerEventAssignTA) cEvent;
+      container.inError = true;
+      String errorMessage = "AttemptId: " + event.getTaskAttemptId() +
+          " cannot be allocated to container: " + container.getContainerId() +
+          " in " + container.getState() + " state";
+      container.maybeSendNodeFailureForFailedAssignment(event.getTaskAttemptId());
+      container.sendTerminatingToTA(event.getTaskAttemptId(), errorMessage);
+      container.registerFailedTAAssignment(event.getTaskAttemptId());
+    }
+  }
+  
+  // Hack to some extent. This allocation should be done while entering one of
+  // the post-running states, insetad of being a transition on the post stop
+  // states.
+  protected static class PullTAAfterStopTransition
+      implements SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      container.noAllocationContainerTask = NO_MORE_TASKS;
+    }
+  }
+
+  protected static class CompletedAtWindDownTransition implements
+      SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      AMContainerEventCompleted event = (AMContainerEventCompleted) cEvent;
+      String diag = event.getContainerStatus().getDiagnostics();
+      if (container.pendingAttempt != null) {
+        container.sendTerminatedToTaskAttempt(container.pendingAttempt, diag);
+      }
+      if (container.runningAttempt != null) {
+        container.sendTerminatedToTaskAttempt(container.runningAttempt, diag);
+      }
+      for (TezTaskAttemptID taId : container.failedAssignments) {
+        container.sendTerminatedToTaskAttempt(taId, diag);
+      }
+      if (!(diag == null || diag.equals(""))) {
+        LOG.info("Container " + container.getContainerId()
+            + " exited with diagnostics set to " + diag);
+      }
+      container.sendCompletedToScheduler();
+    }
+  }
+
+  protected static class NMStopRequestFailedTransition
+      implements SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      container.deAllocate();
+    }
+  }
+
+  protected static class NodeFailedAtNMStopRequestedTransition
+      extends NodeFailedBaseTransition {
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      super.transition(container, cEvent);
+      container.deAllocate();
+    }
+  }
+
+  protected static class ErrorAtNMStopRequestedTransition
+      extends ErrorAtStoppingTransition {
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      super.transition(container, cEvent);
+      container.deAllocate();
+    }
+  }
+
+  protected static class ErrorAtStoppingTransition
+      extends ErrorBaseTransition {
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      super.transition(container, cEvent);
+      if (container.pendingAttempt != null) {
+        container.sendTerminatedToTaskAttempt(container.pendingAttempt, null);
+      }
+      if (container.runningAttempt != null) {
+        container.sendTerminatedToTaskAttempt(container.runningAttempt, null);
+      }
+      for (TezTaskAttemptID taId : container.failedAssignments) {
+        container.sendTerminatedToTaskAttempt(taId, null);
+      }
+      container.sendCompletedToScheduler();
+    }
+  }
+
+  protected static class ErrorBaseTransition implements
+      SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      container.inError = true;
+    }
+  }
+
+  protected static class AssignTAAtCompletedTransition implements
+      SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      // TODO CREUSE CRITICAL: This is completely incorrect. COMPLETED comes
+      // from RMComm directly to the container. Meanwhile, the scheduler may
+      // think the container is still around and assign a task to it. The task
+      // ends up getting a CONTAINER_KILLED message. Task could handle this by
+      // asking for a reschedule in this case. Will end up FAILING the task instead of KILLING it.
+      container.inError = true;
+      AMContainerEventAssignTA event = (AMContainerEventAssignTA) cEvent;
+      String errorMessage = "AttemptId: " + event.getTaskAttemptId()
+          + " cannot be allocated to container: " + container.getContainerId()
+          + " in COMPLETED state";
+      container.maybeSendNodeFailureForFailedAssignment(event.getTaskAttemptId());
+      container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(),
+          errorMessage);
+      container.registerFailedTAAssignment(event.getTaskAttemptId());
+    }
+  }
+
+
+  private void handleExtraTAAssign(
+      AMContainerEventAssignTA event, TezTaskAttemptID currentTaId) {
+    this.inError = true;
+    String errorMessage = "AMScheduler Error: Multiple simultaneous " +
+        "taskAttempt allocations to: " + this.getContainerId() +
+        ". Attempts: " + currentTaId + ", " + event.getTaskAttemptId() +
+        ". Current state: " + this.getState();
+    this.maybeSendNodeFailureForFailedAssignment(event.getTaskAttemptId());
+    this.sendTerminatingToTA(event.getTaskAttemptId(), errorMessage);
+    this.sendTerminatingToTA(currentTaId, errorMessage);
+    this.registerFailedTAAssignment(event.getTaskAttemptId());
+    LOG.warn(errorMessage);
+    this.sendStopRequestToNM();
+    this.unregisterFromTAListener();
+    this.unregisterFromContainerListener();
+  }
+
+
+  protected void registerFailedTAAssignment(TezTaskAttemptID taId) {
+    failedAssignments.add(taId);
+  }
+  
+  protected void deAllocate() {
+    sendEvent(new AMSchedulerEventDeallocateContainer(containerId));
+  }
+  
+  protected void sendCompletedToScheduler() {
+    sendEvent(new AMSchedulerEventContainerCompleted(containerId));
+  }
+
+  protected void sendDiagUpdateOnContainerComplete(
+      AMContainerEventCompleted cEvent) {
+    String diag = cEvent.getContainerStatus().getDiagnostics();
+    if (pendingAttempt != null) {
+      sendEvent(new TaskAttemptEventDiagnosticsUpdate(pendingAttempt, diag));
+    }
+    if (runningAttempt != null) {
+      sendEvent(new TaskAttemptEventDiagnosticsUpdate(runningAttempt, diag));
+    }
+  }
+
+  protected void sendTerminatedToTaskAttempt(
+      TezTaskAttemptID taId, String message) {
+    sendEvent(new TaskAttemptEventContainerTerminated(taId, message));
+  }
+
+  protected void sendTerminatingToTA(TezTaskAttemptID taId, String message) {
+    sendEvent(new TaskAttemptEventContainerTerminating(taId, message));
+  }
+
+  protected void maybeSendNodeFailureForFailedAssignment(TezTaskAttemptID taId) {
+    if (this.nodeFailed) {
+      this.sendNodeFailureToTA(taId, nodeFailedMessage);
+    }
+  }
+
+  protected void sendNodeFailureToTA(TezTaskAttemptID taId, String message) {
+    sendEvent(new TaskAttemptEventNodeFailed(taId, message));
+  }
+
+  protected void sendStartRequestToNM() {
+    sendEvent(new NMCommunicatorLaunchRequestEvent(clc, container));
+  }
+
+  protected void sendStopRequestToNM() {
+    sendEvent(new NMCommunicatorStopRequestEvent(containerId,
+        container.getNodeId(), container.getContainerToken()));
+  }
+  
+  protected void unregisterAttemptFromListener(TezTaskAttemptID attemptId) {
+    taskAttemptListener.unregisterTaskAttempt(attemptId);
+  }
+
+  protected void registerWithTAListener() {
+    taskAttemptListener.registerRunningContainer(containerId);
+  }
+
+  protected void unregisterFromTAListener() {
+    this.taskAttemptListener.unregisterRunningContainer(containerId);
+  }
+
+
+  protected void registerWithContainerListener() {
+    this.containerHeartbeatHandler.register(this.containerId);
+  }
+
+  protected void unregisterFromContainerListener() {
+    this.containerHeartbeatHandler.unregister(this.containerId);
+  }
+  
+
+  
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm.container;
+
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.ContainerHeartbeatHandler;
+import org.apache.tez.dag.app.TaskAttemptListener;
+
+public class AMContainerMap extends AbstractService implements
+    EventHandler<AMContainerEvent> {
+
+  private static final Log LOG = LogFactory.getLog(AMContainerMap.class);
+
+  private final ContainerHeartbeatHandler chh;
+  private final TaskAttemptListener tal;
+  private final AppContext context;
+  private final ConcurrentHashMap<ContainerId, AMContainer> containerMap;
+
+  public AMContainerMap(ContainerHeartbeatHandler chh, TaskAttemptListener tal,
+      AppContext context) {
+    super("AMContainerMaps");
+    this.chh = chh;
+    this.tal = tal;
+    this.context = context;
+    this.containerMap = new ConcurrentHashMap<ContainerId, AMContainer>();
+  }
+
+  @Override
+  public void handle(AMContainerEvent event) {
+    AMContainer container = containerMap.get(event.getContainerId());
+    if(container != null) {
+      container.handle(event);
+    } else {
+      LOG.info("Event for unknown container: " + event.getContainerId());
+    }
+  }
+
+  public void addContainerIfNew(Container container) {
+    AMContainer amc = new AMContainerImpl(container, chh, tal, context);
+    containerMap.putIfAbsent(container.getId(), amc);
+  }
+
+  public AMContainer get(ContainerId containerId) {
+    return containerMap.get(containerId);
+  }
+
+  public Collection<AMContainer> values() {
+    return containerMap.values();
+  }
+}
\ No newline at end of file

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerState.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerState.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerState.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerState.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.dag.app.rm.container;
+
+public enum AMContainerState {
+  ALLOCATED,
+  LAUNCHING,
+  IDLE,
+  RUNNING,
+  // indicates a NM stop request has been attempted. This request could fail, in
+  // which case an RM stop request needs to be sent.
+  STOP_REQUESTED, 
+
+  // A stop request has been registered with YARN
+  STOPPING,
+  COMPLETED,
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerState.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerTask.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerTask.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerTask.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerTask.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,39 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.rm.container;
+
+import org.apache.tez.common.TezTask;
+
+public class AMContainerTask {
+  private final boolean shouldDie;
+  private final TezTask tezTask;
+
+  public AMContainerTask(boolean shouldDie, TezTask tezTask) {
+    this.shouldDie = shouldDie;
+    this.tezTask = tezTask;
+  }
+
+  public boolean shouldDie() {
+    return this.shouldDie;
+  }
+
+  public TezTask getTask() {
+    return this.tezTask;
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerTask.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNode.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNode.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNode.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNode.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm.node;
+
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.EventHandler;
+
+public interface AMNode extends EventHandler<AMNodeEvent> {
+  
+  public NodeId getNodeId();
+  public AMNodeState getState();
+  public List<ContainerId> getContainers();
+
+  public boolean isUnhealthy();
+  public boolean isBlacklisted();
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNode.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEvent.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEvent.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEvent.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm.node;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class AMNodeEvent extends AbstractEvent<AMNodeEventType> {
+
+  private final NodeId nodeId;
+
+  public AMNodeEvent(NodeId nodeId, AMNodeEventType type) {
+    super(type);
+    this.nodeId = nodeId;
+  }
+
+  public NodeId getNodeId() {
+    return this.nodeId;
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEvent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventContainerAllocated.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventContainerAllocated.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventContainerAllocated.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventContainerAllocated.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.dag.app.rm.node;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+
+public class AMNodeEventContainerAllocated extends AMNodeEvent {
+
+  private final ContainerId containerId;
+
+  public AMNodeEventContainerAllocated(NodeId nodeId, ContainerId containerId) {
+    super(nodeId, AMNodeEventType.N_CONTAINER_ALLOCATED);
+    this.containerId = containerId;
+  }
+
+  public ContainerId getContainerId() {
+    return this.containerId;
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventContainerAllocated.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventNodeCountUpdated.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventNodeCountUpdated.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventNodeCountUpdated.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventNodeCountUpdated.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm.node;
+
+public class AMNodeEventNodeCountUpdated extends AMNodeEvent {
+
+  private final int count;
+  
+  public AMNodeEventNodeCountUpdated(int nodeCount) {
+    super(null, AMNodeEventType.N_NODE_COUNT_UPDATED);
+    this.count = nodeCount;
+  }
+  
+  public int getNodeCount() {
+    return this.count;
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventNodeCountUpdated.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventStateChanged.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventStateChanged.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventStateChanged.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventStateChanged.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.dag.app.rm.node;
+
+import org.apache.hadoop.yarn.api.records.NodeReport;
+
+public class AMNodeEventStateChanged extends AMNodeEvent {
+
+  private NodeReport nodeReport;
+
+  public AMNodeEventStateChanged(NodeReport nodeReport) {
+    super(nodeReport.getNodeId(), nodeReport.getNodeHealthStatus()
+        .getIsNodeHealthy() ? AMNodeEventType.N_TURNED_HEALTHY
+        : AMNodeEventType.N_TURNED_UNHEALTHY);
+    this.nodeReport = nodeReport;
+  }
+
+  public NodeReport getNodeReport() {
+    return this.nodeReport;
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventStateChanged.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventTaskAttemptEnded.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventTaskAttemptEnded.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventTaskAttemptEnded.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventTaskAttemptEnded.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.dag.app.rm.node;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+
+public class AMNodeEventTaskAttemptEnded extends AMNodeEvent {
+
+  private final boolean failed;
+  private final ContainerId containerId;
+  private final TezTaskAttemptID taskAttemptId;
+  
+  public AMNodeEventTaskAttemptEnded(NodeId nodeId, ContainerId containerId,
+      TezTaskAttemptID taskAttemptId, boolean failed) {
+    super(nodeId, AMNodeEventType.N_TA_ENDED);
+    this.failed = failed;
+    this.containerId = containerId;
+    this.taskAttemptId = taskAttemptId;
+  }
+
+  public boolean failed() {
+    return failed;
+  }
+  
+  public boolean killed() {
+    return !failed;
+  }
+  
+  public ContainerId getContainerId() {
+    return this.containerId;
+  }
+  
+  public TezTaskAttemptID getTaskAttemptId() {
+    return this.taskAttemptId;
+  }
+}
\ No newline at end of file

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventTaskAttemptEnded.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventTaskAttemptSucceeded.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventTaskAttemptSucceeded.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventTaskAttemptSucceeded.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventTaskAttemptSucceeded.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.dag.app.rm.node;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+
+public class AMNodeEventTaskAttemptSucceeded extends AMNodeEvent {
+
+  // TODO These two parameters really aren't required in this event.
+  private final ContainerId containerId;
+  private final TezTaskAttemptID taskAttemptId;
+
+  public AMNodeEventTaskAttemptSucceeded(NodeId nodeId,
+      ContainerId containerId, TezTaskAttemptID taskAttemptId) {
+    super(nodeId, AMNodeEventType.N_TA_SUCCEEDED);
+    this.containerId = containerId;
+    this.taskAttemptId = taskAttemptId;
+  }
+
+  public ContainerId getContainerId() {
+    return this.containerId;
+  }
+
+  public TezTaskAttemptID getTaskAttemptId() {
+    return this.taskAttemptId;
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventTaskAttemptSucceeded.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventType.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventType.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventType.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventType.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,41 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.rm.node;
+
+public enum AMNodeEventType {
+  //Producer: Scheduler
+  N_CONTAINER_ALLOCATED,
+  
+  //Producer: TaskAttempt
+  N_TA_SUCCEEDED,
+  N_TA_ENDED,
+  
+  //Producer: RMCommunicator
+  N_TURNED_UNHEALTHY,
+  N_TURNED_HEALTHY,
+  N_NODE_COUNT_UPDATED, // for blacklisting.
+  
+  //Producer: AMNodeManager
+  N_IGNORE_BLACKLISTING_ENABLED,
+  N_IGNORE_BLACKLISTING_DISABLED,
+  
+  // Producer: AMNode - Will not reach AMNodeImpl. Used to compute whether
+  // blacklisting should be ignored.
+  N_NODE_WAS_BLACKLISTED
+}
\ No newline at end of file

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventType.java
------------------------------------------------------------------------------
    svn:eol-style = native