You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ac...@apache.org on 2013/03/15 22:26:48 UTC

svn commit: r1457129 [26/38] - in /incubator/tez: ./ tez-ampool/ tez-ampool/src/ tez-ampool/src/main/ tez-ampool/src/main/bin/ tez-ampool/src/main/conf/ tez-ampool/src/main/java/ tez-ampool/src/main/java/org/ tez-ampool/src/main/java/org/apache/ tez-am...

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventAssignTA.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventAssignTA.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventAssignTA.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventAssignTA.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm.container;
+
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.mapreduce.task.impl.MRTaskContext;
+
+public class AMContainerEventAssignTA extends AMContainerEvent {
+
+  private final TaskAttemptId attemptId;
+  // TODO Maybe have tht TAL pull the remoteTask from the TaskAttempt itself ?
+  private final MRTaskContext remoteTaskContext;
+  
+  public AMContainerEventAssignTA(ContainerId containerId,
+      TaskAttemptId attemptId, MRTaskContext remoteTaskContext) {
+    super(containerId, AMContainerEventType.C_ASSIGN_TA);
+    this.attemptId = attemptId;
+    this.remoteTaskContext = remoteTaskContext;
+  }
+  
+  public MRTaskContext getRemoteTaskContext() {
+    return this.remoteTaskContext;
+  }
+  
+  public TaskAttemptId getTaskAttemptId() {
+    return this.attemptId;
+  }
+
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventCompleted.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventCompleted.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventCompleted.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventCompleted.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,36 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.rm.container;
+
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+
+public class AMContainerEventCompleted extends AMContainerEvent {
+
+  private final ContainerStatus containerStatus;
+
+  public AMContainerEventCompleted(ContainerStatus containerStatus) {
+    super(containerStatus.getContainerId(), AMContainerEventType.C_COMPLETED);
+    this.containerStatus = containerStatus;
+  }
+
+  public ContainerStatus getContainerStatus() {
+    return this.containerStatus;
+  }
+
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventLaunchFailed.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventLaunchFailed.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventLaunchFailed.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventLaunchFailed.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm.container;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class AMContainerEventLaunchFailed extends AMContainerEvent {
+
+  private final String message;
+  
+  public AMContainerEventLaunchFailed(ContainerId containerId,
+      String message) {
+    super(containerId, AMContainerEventType.C_LAUNCH_FAILED);
+    this.message = message;
+  }
+  
+  public String getMessage() {
+    return this.message;
+  }
+
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventLaunchRequest.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventLaunchRequest.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventLaunchRequest.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventLaunchRequest.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm.container;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class AMContainerEventLaunchRequest extends AMContainerEvent {
+
+  private final JobId jobId;
+  private final TaskType taskTypeForContainer;
+  private final Token<JobTokenIdentifier> jobToken;
+  private final Credentials credentials;
+  private final boolean shouldProfile;
+  private final JobConf jobConf;
+
+  public AMContainerEventLaunchRequest(ContainerId containerId, JobId jobId,
+      TaskType taskType, Token<JobTokenIdentifier> jobToken,
+      Credentials credentials, boolean shouldProfile, JobConf jobConf) {
+    super(containerId, AMContainerEventType.C_LAUNCH_REQUEST);
+    this.jobId = jobId;
+    this.taskTypeForContainer = taskType;
+    this.jobToken = jobToken;
+    this.credentials = credentials;
+    this.shouldProfile = shouldProfile;
+    this.jobConf = jobConf;
+  }
+
+  public JobId getJobId() {
+    return this.jobId;
+  }
+
+  public TaskType getTaskTypeForContainer() {
+    return this.taskTypeForContainer;
+  }
+
+  public Token<JobTokenIdentifier> getJobToken() {
+    return this.jobToken;
+  }
+
+  public Credentials getCredentials() {
+    return this.credentials;
+  }
+
+  public boolean shouldProfile() {
+    return this.shouldProfile;
+  }
+
+  public JobConf getJobConf() {
+    return this.jobConf;
+  }
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventLaunched.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventLaunched.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventLaunched.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventLaunched.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm.container;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class AMContainerEventLaunched extends AMContainerEvent {
+
+  private final int shufflePort;
+
+  public AMContainerEventLaunched(ContainerId containerId, int shufflePort) {
+    super(containerId, AMContainerEventType.C_LAUNCHED);
+    this.shufflePort = shufflePort;
+  }
+
+  public int getShufflePort() {
+    return this.shufflePort;
+  }
+
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventNodeFailed.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventNodeFailed.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventNodeFailed.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventNodeFailed.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm.container;
+
+import org.apache.hadoop.mapreduce.v2.common.DiagnosableEvent;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class AMContainerEventNodeFailed extends AMContainerEvent implements
+    DiagnosableEvent {
+
+  private final String message;
+
+  public AMContainerEventNodeFailed(ContainerId containerId, String message) {
+    super(containerId, AMContainerEventType.C_NODE_FAILED);
+    this.message = message;
+  }
+
+  @Override
+  public String getDiagnosticInfo() {
+    return message;
+  }
+
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventStopFailed.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventStopFailed.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventStopFailed.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventStopFailed.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm.container;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class AMContainerEventStopFailed extends AMContainerEvent {
+  
+  private final String message;
+
+  public AMContainerEventStopFailed(ContainerId containerId, String message) {
+    super(containerId, AMContainerEventType.C_NM_STOP_FAILED);
+    this.message = message;
+  }
+
+  public String getMessage() {
+    return this.message;
+  }
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventTASucceeded.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventTASucceeded.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventTASucceeded.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventTASucceeded.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm.container;
+
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class AMContainerEventTASucceeded extends AMContainerEvent {
+
+  private final TaskAttemptId attemptId;
+
+  public AMContainerEventTASucceeded(ContainerId containerId,
+      TaskAttemptId attemptId) {
+    super(containerId, AMContainerEventType.C_TA_SUCCEEDED);
+    this.attemptId = attemptId;
+  }
+  
+  public TaskAttemptId getTaskAttemptId() {
+    return this.attemptId;
+  }
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventType.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventType.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventType.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventType.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,56 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.rm.container;
+
+public enum AMContainerEventType {
+
+  //Producer: Scheduler
+  C_LAUNCH_REQUEST,
+  C_ASSIGN_TA,
+  
+  //Producer: NMCommunicator
+  C_LAUNCHED,
+  C_LAUNCH_FAILED,
+
+  //Producer: TAL: PULL_TA is a sync call.
+  C_PULL_TA,
+
+  //Producer: Scheduler via TA
+  C_TA_SUCCEEDED, // maybe change this to C_TA_FINISHED with a status.
+
+  //Producer: RMCommunicator
+  C_COMPLETED,
+  
+  //Producer: RMCommunicator, AMNode
+  C_NODE_FAILED,
+  
+  // TODO ZZZ CREUSE: Consider introducing a new event C_NODE_BLACKLISTED -> container can take a call on what to do if this event comes in.
+  
+  //Producer: TA-> Scheduler -> Container (in case of failure etc)
+  //          Scheduler -> Container (in case of pre-emption etc)
+  //          Node -> Container (in case of Node blacklisted etc)
+  C_STOP_REQUEST,
+  
+  //Producer: NMCommunicator
+  C_NM_STOP_FAILED,
+  C_NM_STOP_SENT,
+  
+  //Producer: ContainerHeartbeatHandler
+  C_TIMED_OUT,
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerHelpers.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerHelpers.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerHelpers.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerHelpers.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,273 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.rm.container;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceChildJVM2;
+import org.apache.hadoop.mapred.ShuffleHandler;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app2.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.util.Apps;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class AMContainerHelpers {
+
+  private static final Log LOG = LogFactory.getLog(AMContainerHelpers.class);
+
+  private static Object commonContainerSpecLock = new Object();
+  private static ContainerLaunchContext commonContainerSpec = null;
+  private static final Object classpathLock = new Object();
+  private static AtomicBoolean initialClasspathFlag = new AtomicBoolean();
+  private static String initialClasspath = null;
+
+  /**
+   * Create a {@link LocalResource} record with all the given parameters.
+   */
+  private static LocalResource createLocalResource(FileSystem fc, Path file,
+      LocalResourceType type, LocalResourceVisibility visibility)
+      throws IOException {
+    FileStatus fstat = fc.getFileStatus(file);
+    URL resourceURL = ConverterUtils.getYarnUrlFromPath(fc.resolvePath(fstat
+        .getPath()));
+    long resourceSize = fstat.getLen();
+    long resourceModificationTime = fstat.getModificationTime();
+
+    return BuilderUtils.newLocalResource(resourceURL, type, visibility,
+        resourceSize, resourceModificationTime);
+  }
+
+  /**
+   * Lock this on initialClasspath so that there is only one fork in the AM for
+   * getting the initial class-path. TODO: We already construct a parent CLC and
+   * use it for all the containers, so this should go away once the
+   * mr-generated-classpath stuff is gone.
+   */
+  private static String getInitialClasspath(Configuration conf)
+      throws IOException {
+    synchronized (classpathLock) {
+      if (initialClasspathFlag.get()) {
+        return initialClasspath;
+      }
+      Map<String, String> env = new HashMap<String, String>();
+      MRApps.setClasspath(env, conf);
+      initialClasspath = env.get(Environment.CLASSPATH.name());
+      initialClasspathFlag.set(true);
+      return initialClasspath;
+    }
+  }
+
+  /**
+   * Create the common {@link ContainerLaunchContext} for all attempts.
+   * 
+   * @param applicationACLs
+   */
+  private static ContainerLaunchContext createCommonContainerLaunchContext(
+      Map<ApplicationAccessType, String> applicationACLs, Configuration conf,
+      Token<JobTokenIdentifier> jobToken,
+      final org.apache.hadoop.mapred.JobID oldJobId, Credentials credentials) {
+
+    // Application resources
+    Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+
+    // Application environment
+    Map<String, String> environment = new HashMap<String, String>();
+
+    // Service data
+    Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
+
+    // Tokens
+    ByteBuffer taskCredentialsBuffer = ByteBuffer.wrap(new byte[] {});
+    try {
+      FileSystem remoteFS = FileSystem.get(conf);
+
+      // //////////// Set up JobJar to be localized properly on the remote NM.
+      String jobJar = conf.get(MRJobConfig.JAR);
+      if (jobJar != null) {
+        Path remoteJobJar = (new Path(jobJar)).makeQualified(remoteFS.getUri(),
+            remoteFS.getWorkingDirectory());
+        localResources.put(
+            MRJobConfig.JOB_JAR,
+            createLocalResource(remoteFS, remoteJobJar, LocalResourceType.FILE,
+                LocalResourceVisibility.APPLICATION));
+        LOG.info("The job-jar file on the remote FS is "
+            + remoteJobJar.toUri().toASCIIString());
+      } else {
+        // Job jar may be null. For e.g, for pipes, the job jar is the hadoop
+        // mapreduce jar itself which is already on the classpath.
+        LOG.info("Job jar is not present. "
+            + "Not adding any jar to the list of resources.");
+      }
+      // //////////// End of JobJar setup
+
+      // //////////// Set up JobConf to be localized properly on the remote NM.
+      Path path = MRApps.getStagingAreaDir(conf, UserGroupInformation
+          .getCurrentUser().getShortUserName());
+      Path remoteJobSubmitDir = new Path(path, oldJobId.toString());
+      Path remoteJobConfPath = new Path(remoteJobSubmitDir,
+          MRJobConfig.JOB_CONF_FILE);
+      localResources.put(
+          MRJobConfig.JOB_CONF_FILE,
+          createLocalResource(remoteFS, remoteJobConfPath,
+              LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
+      LOG.info("The job-conf file on the remote FS is "
+          + remoteJobConfPath.toUri().toASCIIString());
+      // //////////// End of JobConf setup
+
+      // Setup DistributedCache
+      MRApps.setupDistributedCache(conf, localResources);
+
+      // Setup up task credentials buffer
+      Credentials taskCredentials = new Credentials();
+
+      if (UserGroupInformation.isSecurityEnabled()) {
+        LOG.info("Adding #" + credentials.numberOfTokens() + " tokens and #"
+            + credentials.numberOfSecretKeys()
+            + " secret keys for NM use for launching container");
+        taskCredentials.addAll(credentials);
+      }
+
+      // LocalStorageToken is needed irrespective of whether security is enabled
+      // or not.
+      TokenCache.setJobToken(jobToken, taskCredentials);
+
+      DataOutputBuffer containerTokens_dob = new DataOutputBuffer();
+      LOG.info("Size of containertokens_dob is "
+          + taskCredentials.numberOfTokens());
+      taskCredentials.writeTokenStorageToStream(containerTokens_dob);
+      taskCredentialsBuffer = ByteBuffer.wrap(containerTokens_dob.getData(), 0,
+          containerTokens_dob.getLength());
+
+      // Add shuffle token
+      LOG.info("Putting shuffle token in serviceData");
+      serviceData.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
+          ShuffleHandler.serializeServiceData(jobToken));
+
+      Apps.addToEnvironment(environment, Environment.CLASSPATH.name(),
+          getInitialClasspath(conf));
+    } catch (IOException e) {
+      throw new YarnException(e);
+    }
+
+    // Shell
+    environment.put(Environment.SHELL.name(), conf.get(
+        MRJobConfig.MAPRED_ADMIN_USER_SHELL, MRJobConfig.DEFAULT_SHELL));
+
+    // Add pwd to LD_LIBRARY_PATH, add this before adding anything else
+    Apps.addToEnvironment(environment, Environment.LD_LIBRARY_PATH.name(),
+        Environment.PWD.$());
+
+    // Add the env variables passed by the admin
+    Apps.setEnvFromInputString(environment, conf.get(
+        MRJobConfig.MAPRED_ADMIN_USER_ENV,
+        MRJobConfig.DEFAULT_MAPRED_ADMIN_USER_ENV));
+
+    // Construct the actual Container
+    // The null fields are per-container and will be constructed for each
+    // container separately.
+    ContainerLaunchContext container = BuilderUtils.newContainerLaunchContext(
+        null, conf.get(MRJobConfig.USER_NAME), null, localResources,
+        environment, null, serviceData, taskCredentialsBuffer, applicationACLs);
+
+    return container;
+  }
+
+  @VisibleForTesting
+  public static ContainerLaunchContext createContainerLaunchContext(
+      Map<ApplicationAccessType, String> applicationACLs,
+      ContainerId containerID, JobConf jobConf, TaskType taskType,
+      Token<JobTokenIdentifier> jobToken,
+      final org.apache.hadoop.mapred.JobID oldJobId,
+      Resource assignedCapability, ContainerId containerId,
+      TaskAttemptListener taskAttemptListener, Credentials credentials,
+      boolean shouldProfile) {
+
+    synchronized (commonContainerSpecLock) {
+      if (commonContainerSpec == null) {
+        commonContainerSpec = createCommonContainerLaunchContext(
+            applicationACLs, jobConf, jobToken, oldJobId, credentials);
+      }
+    }
+
+    // Fill in the fields needed per-container that are missing in the common
+    // spec.
+
+    // Setup environment by cloning from common env.
+    Map<String, String> env = commonContainerSpec.getEnvironment();
+    Map<String, String> myEnv = new HashMap<String, String>(env.size());
+    myEnv.putAll(env);
+    MapReduceChildJVM2.setVMEnv(myEnv, jobConf, taskType);
+
+    // Set up the launch command
+    List<String> commands = MapReduceChildJVM2.getVMCommand(
+        taskAttemptListener.getAddress(), jobConf, taskType, containerId,
+        oldJobId, shouldProfile);
+
+    // Duplicate the ByteBuffers for access by multiple containers.
+    Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>();
+    for (Entry<String, ByteBuffer> entry : commonContainerSpec.getServiceData()
+        .entrySet()) {
+      myServiceData.put(entry.getKey(), entry.getValue().duplicate());
+    }
+
+    // Construct the actual Container
+    ContainerLaunchContext container = BuilderUtils.newContainerLaunchContext(
+        containerID, commonContainerSpec.getUser(), assignedCapability,
+        commonContainerSpec.getLocalResources(), myEnv, commands,
+        myServiceData, commonContainerSpec.getContainerTokens().duplicate(),
+        applicationACLs);
+
+    return container;
+  }
+
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerImpl.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerImpl.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerImpl.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,915 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.rm.container;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.ContainerHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventContainerTerminated;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventContainerTerminating;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventNodeFailed;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEventContainerCompleted;
+import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorLaunchRequestEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorStopRequestEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMCommunicatorContainerDeAllocateRequestEvent;
+import org.apache.hadoop.mapreduce.v2.common.DiagnosableEvent;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
+import org.apache.hadoop.yarn.state.MultipleArcTransition;
+import org.apache.hadoop.yarn.state.SingleArcTransition;
+import org.apache.hadoop.yarn.state.StateMachine;
+import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.apache.tez.mapreduce.task.impl.MRTaskContext;
+
+@SuppressWarnings("rawtypes")
+public class AMContainerImpl implements AMContainer {
+
+  private static final Log LOG = LogFactory.getLog(AMContainerImpl.class);
+  
+  private final ReadLock readLock;
+  private final WriteLock writeLock;
+  private final ContainerId containerId;
+  // Container to be used for getters on capability, locality etc.
+  private final Container container;
+  private final AppContext appContext;
+  private final ContainerHeartbeatHandler containerHeartbeatHandler;
+  private final TaskAttemptListener taskAttemptListener;
+  protected final EventHandler eventHandler;
+
+  private final List<TaskAttemptId> completedAttempts = new LinkedList<TaskAttemptId>();
+
+  // TODO Maybe this should be pulled from the TaskAttempt.s
+  private final Map<TaskAttemptId, MRTaskContext> remoteTaskMap =
+      new HashMap<TaskAttemptId, MRTaskContext>();
+  
+  // TODO ?? Convert to list and hash.
+  
+  private int shufflePort; 
+  private long idleTimeBetweenTasks = 0;
+  private long lastTaskFinishTime;
+  
+  // An assign can happen even during wind down. e.g. NodeFailure caused the
+  // wind down, and an allocation was pending in the AMScheduler. This could
+  // be modelled as a separate state.
+  private boolean nodeFailed = false;
+  private String nodeFailedMessage;
+  
+  private TaskAttemptId pendingAttempt;
+  private TaskAttemptId runningAttempt;
+  private List<TaskAttemptId> failedAssignments;
+  private TaskAttemptId pullAttempt;
+  
+  private boolean inError = false;
+  
+  private ContainerLaunchContext clc;
+
+  // TODO Consider registering with the TAL, instead of the TAL pulling.
+  // Possibly after splitting TAL and ContainerListener.
+
+  // TODO What should be done with pendingAttempts. Nullify when handled ?
+  // Add them to failed ta list ? Some historic information should be maintained.
+
+  // TODO Create a generic ERROR state. Container tries informing relevant components in this case.
+
+  private final StateMachine<AMContainerState, AMContainerEventType, AMContainerEvent> stateMachine;
+  private static final StateMachineFactory
+      <AMContainerImpl, AMContainerState, AMContainerEventType, AMContainerEvent> 
+      stateMachineFactory = 
+      new StateMachineFactory<AMContainerImpl, AMContainerState, AMContainerEventType, AMContainerEvent>(
+      AMContainerState.ALLOCATED)
+
+        .addTransition(AMContainerState.ALLOCATED, AMContainerState.LAUNCHING, AMContainerEventType.C_LAUNCH_REQUEST, new LaunchRequestTransition())
+        .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, AMContainerEventType.C_ASSIGN_TA, new AssignTaskAttemptAtAllocatedTransition())
+        .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, new CompletedAtAllocatedTransition())
+        .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, AMContainerEventType.C_STOP_REQUEST, new StopRequestAtAllocatedTransition())
+        .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, AMContainerEventType.C_NODE_FAILED, new NodeFailedAtAllocatedTransition())
+        .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, EnumSet.of(AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_PULL_TA, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED, AMContainerEventType.C_TIMED_OUT), new ErrorTransition())
+
+        .addTransition(AMContainerState.LAUNCHING, EnumSet.of(AMContainerState.LAUNCHING, AMContainerState.STOP_REQUESTED), AMContainerEventType.C_ASSIGN_TA, new AssignTaskAttemptTransition())
+        .addTransition(AMContainerState.LAUNCHING, AMContainerState.IDLE, AMContainerEventType.C_LAUNCHED, new LaunchedTransition())
+        .addTransition(AMContainerState.LAUNCHING, AMContainerState.STOPPING, AMContainerEventType.C_LAUNCH_FAILED, new LaunchFailedTransition())
+        // TODO CREUSE : Maybe, consider sending back an attempt if the container asks for one in this state. Waiting for a LAUNCHED event from the NMComm may delay the task allocation.
+        .addTransition(AMContainerState.LAUNCHING, AMContainerState.LAUNCHING, AMContainerEventType.C_PULL_TA) // Is assuming the pullAttempt will be null.
+        .addTransition(AMContainerState.LAUNCHING, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, new CompletedAtLaunchingTransition())
+        .addTransition(AMContainerState.LAUNCHING, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_STOP_REQUEST, new StopRequestAtLaunchingTransition())
+        .addTransition(AMContainerState.LAUNCHING, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, new NodeFailedAtLaunchingTransition())
+        .addTransition(AMContainerState.LAUNCHING, AMContainerState.STOP_REQUESTED, EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED, AMContainerEventType.C_TIMED_OUT), new ErrorAtLaunchingTransition())
+
+        .addTransition(AMContainerState.IDLE, EnumSet.of(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED), AMContainerEventType.C_ASSIGN_TA, new AssignTaskAttemptAtIdleTransition())
+        .addTransition(AMContainerState.IDLE, EnumSet.of(AMContainerState.RUNNING, AMContainerState.IDLE), AMContainerEventType.C_PULL_TA, new PullTAAtIdleTransition())
+        .addTransition(AMContainerState.IDLE, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, new CompletedAtIdleTransition())
+        .addTransition(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_STOP_REQUEST, new StopRequestAtIdleTransition())
+        .addTransition(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_TIMED_OUT, new TimedOutAtIdleTransition())
+        .addTransition(AMContainerState.IDLE, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, new NodeFailedAtIdleTransition())
+        .addTransition(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED, EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED), new ErrorAtIdleTransition())
+        
+        .addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_ASSIGN_TA, new AssignTaskAttemptAtRunningTransition())
+        .addTransition(AMContainerState.RUNNING, AMContainerState.RUNNING, AMContainerEventType.C_PULL_TA)
+        .addTransition(AMContainerState.RUNNING, AMContainerState.IDLE, AMContainerEventType.C_TA_SUCCEEDED, new TASucceededAtRunningTransition())
+        .addTransition(AMContainerState.RUNNING, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, new CompletedAtRunningTransition())
+        .addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_STOP_REQUEST, new StopRequestAtRunningTransition())
+        .addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_TIMED_OUT, new TimedOutAtRunningTransition())
+        .addTransition(AMContainerState.RUNNING, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, new NodeFailedAtRunningTransition())
+        .addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED, EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED), new ErrorAtRunningTransition())
+
+        .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_ASSIGN_TA, new AssignTAAtWindDownTransition())
+        .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, new CompletedAtWindDownTransition())
+        .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOPPING, AMContainerEventType.C_NM_STOP_SENT)
+        .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOPPING, AMContainerEventType.C_NM_STOP_FAILED, new NMStopRequestFailedTransition())
+        .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, new NodeFailedAtNMStopRequestedTransition())
+        .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOP_REQUESTED, EnumSet.of(AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_PULL_TA, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_STOP_REQUEST, AMContainerEventType.C_TIMED_OUT))
+        .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_LAUNCH_REQUEST, new ErrorAtNMStopRequestedTransition())
+        
+        .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_ASSIGN_TA, new AssignTAAtWindDownTransition())
+        .addTransition(AMContainerState.STOPPING, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, new CompletedAtWindDownTransition())
+        .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, new NodeFailedBaseTransition())
+        .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, EnumSet.of(AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_PULL_TA, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_STOP_REQUEST, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED, AMContainerEventType.C_TIMED_OUT))
+        .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_LAUNCH_REQUEST, new ErrorAtStoppingTransition())
+        
+        .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, AMContainerEventType.C_ASSIGN_TA, new AssignTAAtCompletedTransition())
+        .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, AMContainerEventType.C_NODE_FAILED, new NodeFailedBaseTransition())
+        .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_PULL_TA, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_COMPLETED, AMContainerEventType.C_STOP_REQUEST, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED, AMContainerEventType.C_TIMED_OUT))
+
+        .installTopology();
+
+  // Note: Containers will not reach their final state if the RM link is broken,
+  // AM shutdown should not wait for this.
+
+  // Attempting to use a container based purely on reosurces required, etc needs
+  // additional change - JvmID, YarnChild, etc depend on TaskType.
+  public AMContainerImpl(Container container, ContainerHeartbeatHandler chh,
+      TaskAttemptListener tal, AppContext appContext) {
+    ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
+    this.readLock = rwLock.readLock();
+    this.writeLock = rwLock.writeLock();
+    this.container = container;
+    this.containerId = container.getId();
+    this.eventHandler = appContext.getEventHandler();
+    this.appContext = appContext;
+    this.containerHeartbeatHandler = chh;
+    this.taskAttemptListener = tal;
+    this.failedAssignments = new LinkedList<TaskAttemptId>();
+
+    this.stateMachine = stateMachineFactory.make(this);
+  }
+  
+  @Override
+  public AMContainerState getState() {
+    readLock.lock();
+    try {
+      return stateMachine.getCurrentState();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public ContainerId getContainerId() {
+    return this.containerId;
+  }
+  
+  @Override
+  public Container getContainer() {
+    return this.container;
+  }
+
+  @Override
+  public List<TaskAttemptId> getCompletedTaskAttempts() {
+    readLock.lock();
+    try {
+      return new ArrayList<TaskAttemptId>(this.completedAttempts);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public List<TaskAttemptId> getAllTaskAttempts() {
+    readLock.lock();
+    try {
+      List<TaskAttemptId> allTasks =
+          new ArrayList<TaskAttemptId>(this.completedAttempts);
+      if (this.runningAttempt != null) {
+        allTasks.add(runningAttempt);
+      }
+      if (this.pendingAttempt != null) {
+        allTasks.add(pendingAttempt);
+      }
+      return allTasks;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public List<TaskAttemptId> getQueuedTaskAttempts() {
+    readLock.lock();
+    try {
+      return Collections.singletonList(this.pendingAttempt);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public TaskAttemptId getRunningTaskAttempt() {
+    readLock.lock();
+    try {
+      return this.runningAttempt;
+    } finally {
+      readLock.unlock();
+    }
+  }
+  
+  @Override
+  public int getShufflePort() {
+    readLock.lock();
+    try {
+      return this.shufflePort;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public void handle(AMContainerEvent event) {
+    this.writeLock.lock();
+    LOG.info("DEBUG: Processing ContainerEvent: " + event.getContainerId() + " of type "
+        + event.getType() + " while in state: " + getState());
+    try {
+      final AMContainerState oldState = getState();
+      try {
+        stateMachine.doTransition(event.getType(), event);
+      } catch (InvalidStateTransitonException e) {
+        LOG.error("Can't handle event " + event.getType()
+            + " at current state " + oldState + " for ContainerId "
+            + this.containerId, e);
+        inError = true;
+        // TODO Can't set state to COMPLETED. Add a default error state.
+      }
+      if (oldState != getState()) {
+        LOG.info("AMContainer " + this.containerId + " transitioned from "
+            + oldState + " to " + getState());
+      }
+    } finally {
+      writeLock.unlock();
+    }
+  }
+  
+  @SuppressWarnings("unchecked")
+  private void sendEvent(Event<?> event) {
+    this.eventHandler.handle(event);
+  }
+
+  // Push the TaskAttempt to the TAL, instead of the TAL pulling when a JVM asks
+  // for a TaskAttempt.
+  public MRTaskContext pullTaskContext() {
+    this.writeLock.lock();
+    try {
+      this.handle(
+          new AMContainerEvent(containerId, AMContainerEventType.C_PULL_TA));
+      return pullAttempt == null ? null : remoteTaskMap.remove(pullAttempt);
+    } finally {
+      this.pullAttempt = null;
+      this.writeLock.unlock();
+    }
+  }
+
+
+  //////////////////////////////////////////////////////////////////////////////
+  //                   Start of Transition Classes                            //
+  //////////////////////////////////////////////////////////////////////////////
+
+  protected static class LaunchRequestTransition implements
+      SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      AMContainerEventLaunchRequest event = (AMContainerEventLaunchRequest) cEvent;
+      
+      container.clc = AMContainerHelpers.createContainerLaunchContext(
+          container.appContext.getApplicationACLs(),
+          container.getContainerId(), event.getJobConf(),
+          event.getTaskTypeForContainer(), event.getJobToken(),
+          TypeConverter.fromYarn(event.getJobId()),
+          container.getContainer().getResource(), container.containerId,
+          container.taskAttemptListener, event.getCredentials(),
+          event.shouldProfile());
+
+      container.registerWithTAListener();
+      container.sendStartRequestToNM();
+      LOG.info("Sending Launch Request for Container with id: " +
+          container.clc.getContainerId());
+      // Forget about the clc to save resources. At some point, part of the clc
+      // info may need to be exposed to the scheduler to figure out whether a 
+      // container can be used for a specific TaskAttempt.
+      container.clc = null;
+    }
+  }
+
+  protected static class AssignTaskAttemptAtAllocatedTransition implements
+      SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      AMContainerEventAssignTA event = (AMContainerEventAssignTA) cEvent;
+      container.inError = true;
+      container.maybeSendNodeFailureForFailedAssignment(event
+          .getTaskAttemptId());
+      container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(),
+          "AMScheduler Error: TaskAttempt allocated to unlaunched container: " +
+              container.getContainerId());
+      container.sendCompletedToScheduler();
+      container.deAllocate();
+      LOG.warn("Unexpected TA Assignment: TAId: " + event.getTaskAttemptId() +
+          "  for ContainerId: " + container.getContainerId() +
+          " while in state: " + container.getState());
+    }
+  }
+
+  protected static class CompletedAtAllocatedTransition implements
+      SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      container.sendCompletedToScheduler();
+    }
+  }
+
+  protected static class StopRequestAtAllocatedTransition implements
+      SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      container.sendCompletedToScheduler();
+      container.deAllocate();
+    }
+  }
+
+  protected static class NodeFailedAtAllocatedTransition implements
+      SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      container.nodeFailed = true;
+      if (cEvent instanceof DiagnosableEvent) {
+        container.nodeFailedMessage =
+            ((DiagnosableEvent) cEvent).getDiagnosticInfo();
+      }
+      container.sendCompletedToScheduler();
+      container.deAllocate();
+    }
+  }
+
+  protected static class ErrorTransition extends ErrorBaseTransition {
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      container.sendCompletedToScheduler();
+      container.deAllocate();
+      LOG.info(
+          "Unexpected event type: " + cEvent.getType() + " while in state: " +
+              container.getState() + ". Event: " + cEvent);
+
+    }
+  }
+
+  protected static class AssignTaskAttemptTransition implements
+      MultipleArcTransition<AMContainerImpl, AMContainerEvent, AMContainerState> {
+
+    @Override
+    public AMContainerState transition(
+        AMContainerImpl container, AMContainerEvent cEvent) {
+      AMContainerEventAssignTA event = (AMContainerEventAssignTA) cEvent;
+      if (container.pendingAttempt != null) {
+        // This may include a couple of additional (harmless) unregister calls
+        // to the taskAttemptListener and containerHeartbeatHandler - in case
+        // of assign at any state prior to IDLE.
+        container.handleExtraTAAssign(event, container.pendingAttempt);
+        // TODO XXX: Verify that it's ok to send in a NM_STOP_REQUEST. The
+        // NMCommunicator should be able to handle this. The STOP_REQUEST would
+        // only go out after the START_REQUEST.
+        return AMContainerState.STOP_REQUESTED;
+      }
+      container.pendingAttempt = event.getTaskAttemptId();
+      LOG.info("DEBUG: AssignTA: attempt: " + event.getRemoteTaskContext());
+      container.remoteTaskMap
+          .put(event.getTaskAttemptId(), event.getRemoteTaskContext());
+      return container.getState();
+    }
+  }
+
+  protected static class LaunchedTransition implements
+      SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      AMContainerEventLaunched event = (AMContainerEventLaunched) cEvent;
+      container.shufflePort = event.getShufflePort();
+      container.registerWithContainerListener();
+    }
+  }
+
+  protected static class LaunchFailedTransition implements
+      SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      if (container.pendingAttempt != null) {
+        AMContainerEventLaunchFailed event = (AMContainerEventLaunchFailed) cEvent;
+        container.sendTerminatingToTA(container.pendingAttempt,
+            event.getMessage());
+      }
+      container.unregisterFromTAListener();
+      container.deAllocate();
+    }
+  }
+
+  protected static class CompletedAtLaunchingTransition
+      implements SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      LOG.info(
+          "Cotnainer with id: " + container.getContainerId() + " Completed." +
+              " Previous state was: " + container.getState());
+      if (container.pendingAttempt != null) {
+        String errorMessage = getMessage(container, cEvent);
+        container.sendTerminatedToTaskAttempt(container.pendingAttempt,
+            errorMessage);
+        LOG.warn(errorMessage);
+      }
+      container.unregisterFromTAListener();
+      container.sendCompletedToScheduler();
+    }
+
+    public String getMessage(
+        AMContainerImpl container, AMContainerEvent event) {
+      return "Container" + container.getContainerId() +
+          " COMPLETED while trying to launch";
+    }
+  }
+
+  protected static class StopRequestAtLaunchingTransition
+      implements SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      if (container.pendingAttempt != null) {
+        container.sendTerminatingToTA(container.pendingAttempt,
+            getMessage(container, cEvent));
+      }
+      container.unregisterFromTAListener();
+      container.sendStopRequestToNM();
+    }
+
+    public String getMessage(
+        AMContainerImpl container, AMContainerEvent event) {
+      return "Container " + container.getContainerId() +
+          " received a STOP_REQUEST";
+    }
+  }
+
+  protected static class NodeFailedBaseTransition
+      implements SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+
+      container.nodeFailed = true;
+      String errorMessage = null;
+      if (cEvent instanceof DiagnosableEvent) {
+        errorMessage = ((DiagnosableEvent)cEvent).getDiagnosticInfo();
+        container.nodeFailedMessage = errorMessage;
+      }
+
+      for (TaskAttemptId taId : container.failedAssignments) {
+        container.sendNodeFailureToTA(taId, errorMessage);
+      }
+      for (TaskAttemptId taId : container.completedAttempts) {
+        container.sendNodeFailureToTA(taId, errorMessage);
+      }
+
+      if (container.pendingAttempt != null) {
+        container.sendNodeFailureToTA(container.pendingAttempt,
+            errorMessage);
+        container.sendTerminatingToTA(container.pendingAttempt, "Node failure");
+      }
+      if (container.runningAttempt != null) {
+        container.sendNodeFailureToTA(container.runningAttempt,
+            errorMessage);
+        container.sendTerminatingToTA(container.runningAttempt, "Node failure");
+      }
+    }
+  }
+
+  protected static class NodeFailedAtLaunchingTransition
+      extends NodeFailedBaseTransition {
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      super.transition(container, cEvent);
+      container.unregisterFromTAListener();
+      container.deAllocate();
+    }
+  }
+
+  protected static class ErrorAtLaunchingTransition
+      extends ErrorTransition {
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      super.transition(container, cEvent);
+      if (container.pendingAttempt != null) {
+        container.sendTerminatedToTaskAttempt(container.pendingAttempt,
+            "Container " + container.getContainerId() +
+                " hit an invalid transition - " + cEvent.getType() + " at " +
+                container.getState());
+      }
+      container.unregisterFromTAListener();
+    }
+  }
+
+  protected static class AssignTaskAttemptAtIdleTransition
+      extends AssignTaskAttemptTransition {
+    @Override
+    public AMContainerState transition(
+        AMContainerImpl container, AMContainerEvent cEvent) {
+      LOG.info("DEBUG: AssignTAAtIdle: attempt: " +
+          ((AMContainerEventAssignTA) cEvent).getRemoteTaskContext());
+      return super.transition(container, cEvent);
+    }
+  }
+
+  protected static class PullTAAtIdleTransition implements
+      MultipleArcTransition<AMContainerImpl, AMContainerEvent, AMContainerState> {
+
+    @Override
+    public AMContainerState transition(
+        AMContainerImpl container, AMContainerEvent cEvent) {
+      if (container.pendingAttempt != null) {
+        // This will be invoked as part of the PULL_REQUEST - so pullAttempt pullAttempt
+        // should ideally only end up being populated during the duration of this call,
+        // which is in a write lock. pullRequest() should move this to the running state.
+        container.pullAttempt = container.pendingAttempt;
+        container.runningAttempt = container.pendingAttempt;
+        container.pendingAttempt = null;
+        if (container.lastTaskFinishTime != 0) {
+          long idleTimeDiff =
+              System.currentTimeMillis() - container.lastTaskFinishTime;
+          LOG.info("DEBUG: Computing idle time for container: " +
+              container.getContainerId() + ", lastFinishTime: " +
+              container.lastTaskFinishTime + ", Incremented by: " +
+              idleTimeDiff);
+          container.idleTimeBetweenTasks +=
+              System.currentTimeMillis() - container.lastTaskFinishTime;
+        }
+        LOG.info("Assigned taskAttempt + [" + container.runningAttempt +
+            "] to container: [" + container.getContainerId() + "]");
+        return AMContainerState.RUNNING;
+      } else {
+        return AMContainerState.IDLE;
+      }
+    }
+  }
+
+  protected static class CompletedAtIdleTransition
+      extends CompletedAtLaunchingTransition {
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      super.transition(container, cEvent);
+      container.unregisterFromContainerListener();
+    }
+
+    @Override
+    public String getMessage(
+        AMContainerImpl container, AMContainerEvent event) {
+      return "Container " + container.getContainerId() + "COMPLETED";
+    }
+  }
+
+  protected static class StopRequestAtIdleTransition
+      extends StopRequestAtLaunchingTransition {
+
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      super.transition(container, cEvent);
+      LOG.info("DEBUG: IdleTimeBetweenTasks: " + container.idleTimeBetweenTasks);
+      container.unregisterFromContainerListener();
+    }
+  }
+
+  protected static class TimedOutAtIdleTransition
+      extends StopRequestAtIdleTransition {
+
+    public String getMessage(
+        AMContainerImpl container, AMContainerEvent event) {
+      return "Container " + container.getContainerId() +
+          " timed out";
+    }
+  }
+
+  protected static class NodeFailedAtIdleTransition
+      extends NodeFailedAtLaunchingTransition {
+
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      super.transition(container, cEvent);
+      container.unregisterFromContainerListener();
+    }
+  }
+
+  protected static class ErrorAtIdleTransition
+      extends ErrorAtLaunchingTransition {
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      super.transition(container, cEvent);
+      container.unregisterFromContainerListener();
+    }
+  }
+
+  protected static class AssignTaskAttemptAtRunningTransition implements
+      SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+
+      AMContainerEventAssignTA event = (AMContainerEventAssignTA) cEvent;
+      container.unregisterAttemptFromListener(container.runningAttempt);
+      container.handleExtraTAAssign(event, container.runningAttempt);
+    }
+  }
+
+  protected static class TASucceededAtRunningTransition
+      implements SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      container.lastTaskFinishTime = System.currentTimeMillis();
+      container.completedAttempts.add(container.runningAttempt);
+      container.unregisterAttemptFromListener(container.runningAttempt);
+      container.runningAttempt = null;
+    }
+  }
+
+  protected static class CompletedAtRunningTransition
+      extends CompletedAtIdleTransition {
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      container.sendTerminatedToTaskAttempt(container.runningAttempt,
+          getMessage(container, cEvent));
+      container.unregisterAttemptFromListener(container.runningAttempt);
+      super.transition(container, cEvent);
+    }
+  }
+
+  protected static class StopRequestAtRunningTransition
+      extends StopRequestAtIdleTransition {
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+
+      container.unregisterAttemptFromListener(container.runningAttempt);
+      container.sendTerminatingToTA(container.runningAttempt,
+          " Container" + container.getContainerId() +
+              " received a STOP_REQUEST");
+      super.transition(container, cEvent);
+    }
+  }
+
+  protected static class TimedOutAtRunningTransition
+      extends StopRequestAtRunningTransition {
+    @Override
+    public String getMessage(
+        AMContainerImpl container, AMContainerEvent event) {
+      return "Container " + container.getContainerId() +
+          " timed out";
+    }
+  }
+
+  protected static class NodeFailedAtRunningTransition
+      extends NodeFailedAtIdleTransition {
+
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      super.transition(container, cEvent);
+      container.unregisterAttemptFromListener(container.runningAttempt);
+    }
+  }
+
+  protected static class ErrorAtRunningTransition
+      extends ErrorAtIdleTransition {
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      super.transition(container, cEvent);
+      container.unregisterAttemptFromListener(container.runningAttempt);
+      container.sendTerminatedToTaskAttempt(container.runningAttempt,
+          "Container " + container.getContainerId() +
+              " hit an invalid transition - " + cEvent.getType() + " at " +
+              container.getState());
+    }
+  }
+
+  protected static class AssignTAAtWindDownTransition
+      implements SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      AMContainerEventAssignTA event = (AMContainerEventAssignTA) cEvent;
+      container.inError = true;
+      String errorMessage = "AttemptId: " + event.getTaskAttemptId() +
+          " cannot be allocated to container: " + container.getContainerId() +
+          " in " + container.getState() + " state";
+      container.maybeSendNodeFailureForFailedAssignment(event.getTaskAttemptId());
+      container.sendTerminatingToTA(event.getTaskAttemptId(), errorMessage);
+      container.registerFailedTAAssignment(event.getTaskAttemptId());
+    }
+  }
+
+  protected static class CompletedAtWindDownTransition implements
+      SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      if (container.pendingAttempt != null) {
+        container.sendTerminatedToTaskAttempt(container.pendingAttempt, null);
+      }
+      if (container.runningAttempt != null) {
+        container.sendTerminatedToTaskAttempt(container.runningAttempt, null);
+      }
+      for (TaskAttemptId taId : container.failedAssignments) {
+        container.sendTerminatedToTaskAttempt(taId, null);
+      }
+      container.sendCompletedToScheduler();
+    }
+  }
+
+  protected static class NMStopRequestFailedTransition
+      implements SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      container.deAllocate();
+    }
+  }
+
+  protected static class NodeFailedAtNMStopRequestedTransition
+      extends NodeFailedBaseTransition {
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      super.transition(container, cEvent);
+      container.deAllocate();
+    }
+  }
+
+  protected static class ErrorAtNMStopRequestedTransition
+      extends ErrorAtStoppingTransition {
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      super.transition(container, cEvent);
+      container.deAllocate();
+    }
+  }
+
+  protected static class ErrorAtStoppingTransition
+      extends ErrorBaseTransition {
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      super.transition(container, cEvent);
+      if (container.pendingAttempt != null) {
+        container.sendTerminatedToTaskAttempt(container.pendingAttempt, null);
+      }
+      if (container.runningAttempt != null) {
+        container.sendTerminatedToTaskAttempt(container.runningAttempt, null);
+      }
+      for (TaskAttemptId taId : container.failedAssignments) {
+        container.sendTerminatedToTaskAttempt(taId, null);
+      }
+      container.sendCompletedToScheduler();
+    }
+  }
+
+  protected static class ErrorBaseTransition implements
+      SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      container.inError = true;
+    }
+  }
+
+  protected static class AssignTAAtCompletedTransition implements
+      SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+
+    @Override
+    public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      // TODO CREUSE CRITICAL: This is completely incorrect. COMPLETED comes
+      // from RMComm directly to the container. Meanwhile, the scheduler may
+      // think the container is still around and assign a task to it. The task
+      // ends up getting a CONTAINER_KILLED message. Task could handle this by
+      // asking for a reschedule in this case. Will end up FAILING the task instead of KILLING it.
+      container.inError = true;
+      AMContainerEventAssignTA event = (AMContainerEventAssignTA) cEvent;
+      String errorMessage = "AttemptId: " + event.getTaskAttemptId()
+          + " cannot be allocated to container: " + container.getContainerId()
+          + " in COMPLETED state";
+      container.maybeSendNodeFailureForFailedAssignment(event.getTaskAttemptId());
+      container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(),
+          errorMessage);
+      container.registerFailedTAAssignment(event.getTaskAttemptId());
+    }
+  }
+
+
+  private void handleExtraTAAssign(
+      AMContainerEventAssignTA event, TaskAttemptId currentTaId) {
+    this.inError = true;
+    String errorMessage = "AMScheduler Error: Multiple simultaneous " +
+        "taskAttempt allocations to: " + this.getContainerId() +
+        ". Attempts: " + currentTaId + ", " + event.getTaskAttemptId() +
+        ". Current state: " + this.getState();
+    this.maybeSendNodeFailureForFailedAssignment(event.getTaskAttemptId());
+    this.sendTerminatingToTA(event.getTaskAttemptId(), errorMessage);
+    this.sendTerminatingToTA(currentTaId, errorMessage);
+    this.registerFailedTAAssignment(event.getTaskAttemptId());
+    LOG.warn(errorMessage);
+    this.sendStopRequestToNM();
+    this.unregisterFromTAListener();
+    this.unregisterFromContainerListener();
+  }
+
+
+  protected void registerFailedTAAssignment(TaskAttemptId taId) {
+    failedAssignments.add(taId);
+  }
+  
+  protected void deAllocate() {
+    sendEvent(new RMCommunicatorContainerDeAllocateRequestEvent(containerId));
+  }
+  
+  protected void sendCompletedToScheduler() {
+    sendEvent(new AMSchedulerEventContainerCompleted(containerId));
+  }
+
+  protected void sendTerminatedToTaskAttempt(
+      TaskAttemptId taId, String message) {
+    sendEvent(new TaskAttemptEventContainerTerminated(taId, message));
+  }
+
+  protected void sendTerminatingToTA(TaskAttemptId taId, String message) {
+    sendEvent(new TaskAttemptEventContainerTerminating(taId, message));
+  }
+
+  protected void maybeSendNodeFailureForFailedAssignment(TaskAttemptId taId) {
+    if (this.nodeFailed) {
+      this.sendNodeFailureToTA(taId, nodeFailedMessage);
+    }
+  }
+  
+  protected void sendNodeFailureToTA(TaskAttemptId taId, String message) {
+    sendEvent(new TaskAttemptEventNodeFailed(taId, message));
+  }
+
+  protected void sendStartRequestToNM() {
+    sendEvent(new NMCommunicatorLaunchRequestEvent(clc, container));
+  }
+
+  protected void sendStopRequestToNM() {
+    sendEvent(new NMCommunicatorStopRequestEvent(containerId,
+        container.getNodeId(), container.getContainerToken()));
+  }
+  
+  protected void unregisterAttemptFromListener(TaskAttemptId attemptId) {
+    taskAttemptListener.unregisterTaskAttempt(attemptId);
+  }
+
+  protected void registerWithTAListener() {
+    taskAttemptListener.registerRunningContainer(containerId);
+  }
+
+  protected void unregisterFromTAListener() {
+    this.taskAttemptListener.unregisterRunningContainer(containerId);
+  }
+
+
+  protected void registerWithContainerListener() {
+    this.containerHeartbeatHandler.register(this.containerId);
+  }
+
+  protected void unregisterFromContainerListener() {
+    this.containerHeartbeatHandler.unregister(this.containerId);
+  }
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerMap.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerMap.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerMap.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerMap.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm.container;
+
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.jobhistory.ContainerHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.TaskAttemptListener;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.AbstractService;
+
+public class AMContainerMap extends AbstractService implements
+    EventHandler<AMContainerEvent> {
+
+  private static final Log LOG = LogFactory.getLog(AMContainerMap.class);
+
+  private final ContainerHeartbeatHandler chh;
+  private final TaskAttemptListener tal;
+  private final AppContext context;
+  private final ConcurrentHashMap<ContainerId, AMContainer> containerMap;
+
+  public AMContainerMap(ContainerHeartbeatHandler chh, TaskAttemptListener tal,
+      AppContext context) {
+    super("AMContainerMaps");
+    this.chh = chh;
+    this.tal = tal;
+    this.context = context;
+    this.containerMap = new ConcurrentHashMap<ContainerId, AMContainer>();
+  }
+
+  @Override
+  public void handle(AMContainerEvent event) {
+    ContainerId containerId = event.getContainerId();
+    containerMap.get(containerId).handle(event);
+  }
+
+  public void addContainerIfNew(Container container) {
+    AMContainer amc = new AMContainerImpl(container, chh, tal, context);
+    containerMap.putIfAbsent(container.getId(), amc);
+  }
+
+  public AMContainer get(ContainerId containerId) {
+    return containerMap.get(containerId);
+  }
+
+  public Collection<AMContainer> values() {
+    return containerMap.values();
+  }
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerState.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerState.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerState.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerState.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm.container;
+
+public enum AMContainerState {
+  ALLOCATED,
+  LAUNCHING,
+  IDLE,
+  RUNNING,
+  // indicates a NM stop request has been attempted. This request could fail, in
+  // which case an RM stop request needs to be sent.
+  STOP_REQUESTED, 
+
+  // A stop request has been registered with YARN
+  STOPPING,
+  COMPLETED,
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNode.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNode.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNode.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNode.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm.node;
+
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.EventHandler;
+
+public interface AMNode extends EventHandler<AMNodeEvent> {
+  
+  public NodeId getNodeId();
+  public AMNodeState getState();
+  public List<ContainerId> getContainers();
+
+  public boolean isUnhealthy();
+  public boolean isBlacklisted();
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEvent.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEvent.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEvent.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm.node;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class AMNodeEvent extends AbstractEvent<AMNodeEventType> {
+
+  private final NodeId nodeId;
+
+  public AMNodeEvent(NodeId nodeId, AMNodeEventType type) {
+    super(type);
+    this.nodeId = nodeId;
+  }
+
+  public NodeId getNodeId() {
+    return this.nodeId;
+  }
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventContainerAllocated.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventContainerAllocated.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventContainerAllocated.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventContainerAllocated.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm.node;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+
+public class AMNodeEventContainerAllocated extends AMNodeEvent {
+
+  private final ContainerId containerId;
+
+  public AMNodeEventContainerAllocated(NodeId nodeId, ContainerId containerId) {
+    super(nodeId, AMNodeEventType.N_CONTAINER_ALLOCATED);
+    this.containerId = containerId;
+  }
+
+  public ContainerId getContainerId() {
+    return this.containerId;
+  }
+
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventNodeCountUpdated.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventNodeCountUpdated.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventNodeCountUpdated.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventNodeCountUpdated.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm.node;
+
+public class AMNodeEventNodeCountUpdated extends AMNodeEvent {
+
+  private final int count;
+  
+  public AMNodeEventNodeCountUpdated(int nodeCount) {
+    super(null, AMNodeEventType.N_NODE_COUNT_UPDATED);
+    this.count = nodeCount;
+  }
+  
+  public int getNodeCount() {
+    return this.count;
+  }
+}