You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/05/12 22:49:42 UTC

[25/36] hadoop git commit: MAPREDUCE-5465. Tasks are often killed before they exit on their own. Contributed by Ming Ma

MAPREDUCE-5465. Tasks are often killed before they exit on their own. Contributed by Ming Ma

Conflicts:
	hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5625ac46
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5625ac46
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5625ac46

Branch: refs/heads/YARN-2928
Commit: 5625ac469a4d7ba4d936a0002c0fe61828c8eedb
Parents: aa3e32d
Author: Jason Lowe <jl...@apache.org>
Authored: Mon May 11 22:37:35 2015 +0000
Committer: Zhijie Shen <zj...@apache.org>
Committed: Tue May 12 13:44:13 2015 -0700

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |   3 +
 .../hadoop/mapred/LocalContainerLauncher.java   |  12 +-
 .../hadoop/mapreduce/v2/app/AppContext.java     |   2 +
 .../hadoop/mapreduce/v2/app/MRAppMaster.java    |  39 +-
 .../v2/app/TaskAttemptFinishingMonitor.java     |  63 +++
 .../v2/app/client/MRClientService.java          |   2 +-
 .../v2/app/job/TaskAttemptStateInternal.java    |  39 +-
 .../v2/app/job/event/TaskAttemptEventType.java  |   3 +
 .../v2/app/job/impl/TaskAttemptImpl.java        | 445 +++++++++++++++----
 .../v2/app/launcher/ContainerLauncher.java      |   8 +-
 .../v2/app/launcher/ContainerLauncherImpl.java  |  11 +-
 .../mapred/TestTaskAttemptFinishingMonitor.java | 108 +++++
 .../apache/hadoop/mapreduce/v2/app/MRApp.java   |  16 +
 .../hadoop/mapreduce/v2/app/MockAppContext.java |   6 +
 .../hadoop/mapreduce/v2/app/TestFail.java       |   2 +
 .../hadoop/mapreduce/v2/app/TestKill.java       | 142 ++++--
 .../mapreduce/v2/app/TestRuntimeEstimators.java |   5 +
 .../v2/app/job/impl/TestTaskAttempt.java        | 263 ++++++++++-
 .../apache/hadoop/mapreduce/MRJobConfig.java    |  10 +-
 .../src/main/resources/mapred-default.xml       |  20 +
 .../hadoop/mapreduce/v2/hs/JobHistory.java      |   6 +
 .../v2/TestSpeculativeExecutionWithMRApp.java   |   6 +-
 22 files changed, 1060 insertions(+), 151 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/5625ac46/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 2152be0..e28d575 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -426,6 +426,9 @@ Release 2.8.0 - UNRELEASED
     MAPREDUCE-6353. Divide by zero error in MR AM when calculating available 
     containers. (Anubhav Dhoot via kasha)
 
+    MAPREDUCE-5465. Tasks are often killed before they exit on their own
+    (Ming Ma via jlowe)
+
 Release 2.7.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5625ac46/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
index ffc5326..52b3497 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
@@ -264,7 +264,8 @@ public class LocalContainerLauncher extends AbstractService implements
           context.getEventHandler().handle(
               new TaskAttemptEvent(taId,
                   TaskAttemptEventType.TA_CONTAINER_CLEANED));
-
+        } else if (event.getType() == EventType.CONTAINER_COMPLETED) {
+          LOG.debug("Container completed " + event.toString());
         } else {
           LOG.warn("Ignoring unexpected event " + event.toString());
         }
@@ -314,7 +315,14 @@ public class LocalContainerLauncher extends AbstractService implements
         }
         runSubtask(remoteTask, ytask.getType(), attemptID, numMapTasks,
                    (numReduceTasks > 0), localMapFiles);
-        
+
+        // In non-uber mode, TA gets TA_CONTAINER_COMPLETED from MRAppMaster
+        // as part of NM -> RM -> AM notification route.
+        // In uber mode, given the task run inside the MRAppMaster container,
+        // we have to simulate the notification.
+        context.getEventHandler().handle(new TaskAttemptEvent(attemptID,
+            TaskAttemptEventType.TA_CONTAINER_COMPLETED));
+
       } catch (RuntimeException re) {
         JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptID.getTaskId().getJobId());
         jce.addCounterUpdate(JobCounter.NUM_FAILED_UBERTASKS, 1);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5625ac46/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java
index 31e282a..4af11c3 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java
@@ -67,4 +67,6 @@ public interface AppContext {
   boolean hasSuccessfullyUnregistered();
 
   String getNMHostname();
+
+  TaskAttemptFinishingMonitor getTaskAttemptFinishingMonitor();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5625ac46/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
index 9272c7a..fa0a432 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
@@ -208,6 +208,14 @@ public class MRAppMaster extends CompositeService {
   private SpeculatorEventDispatcher speculatorEventDispatcher;
   private AMPreemptionPolicy preemptionPolicy;
 
+  // After a task attempt completes from TaskUmbilicalProtocol's point of view,
+  // it will be transitioned to finishing state.
+  // taskAttemptFinishingMonitor is just a timer for attempts in finishing
+  // state. If the attempt stays in finishing state for too long,
+  // taskAttemptFinishingMonitor will notify the attempt via TA_TIMED_OUT
+  // event.
+  private TaskAttemptFinishingMonitor taskAttemptFinishingMonitor;
+
   private Job job;
   private Credentials jobCredentials = new Credentials(); // Filled during init
   protected UserGroupInformation currentUser; // Will be setup during init
@@ -250,6 +258,12 @@ public class MRAppMaster extends CompositeService {
     logSyncer = TaskLog.createLogSyncer();
     LOG.info("Created MRAppMaster for application " + applicationAttemptId);
   }
+  protected TaskAttemptFinishingMonitor createTaskAttemptFinishingMonitor(
+      EventHandler eventHandler) {
+    TaskAttemptFinishingMonitor monitor =
+        new TaskAttemptFinishingMonitor(eventHandler);
+    return monitor;
+  }
 
   @Override
   protected void serviceInit(final Configuration conf) throws Exception {
@@ -260,7 +274,11 @@ public class MRAppMaster extends CompositeService {
 
     initJobCredentialsAndUGI(conf);
 
-    context = new RunningAppContext(conf);
+    dispatcher = createDispatcher();
+    addIfService(dispatcher);
+    taskAttemptFinishingMonitor = createTaskAttemptFinishingMonitor(dispatcher.getEventHandler());
+    addIfService(taskAttemptFinishingMonitor);
+    context = new RunningAppContext(conf, taskAttemptFinishingMonitor);
 
     // Job name is the same as the app name util we support DAG of jobs
     // for an app later
@@ -327,9 +345,6 @@ public class MRAppMaster extends CompositeService {
     }
     
     if (errorHappenedShutDown) {
-      dispatcher = createDispatcher();
-      addIfService(dispatcher);
-      
       NoopEventHandler eater = new NoopEventHandler();
       //We do not have a JobEventDispatcher in this path
       dispatcher.register(JobEventType.class, eater);
@@ -376,9 +391,6 @@ public class MRAppMaster extends CompositeService {
     } else {
       committer = createOutputCommitter(conf);
 
-      dispatcher = createDispatcher();
-      addIfService(dispatcher);
-
       //service to handle requests from JobClient
       clientService = createClientService(context);
       // Init ClientService separately so that we stop it separately, since this
@@ -967,10 +979,14 @@ public class MRAppMaster extends CompositeService {
     private final ClientToAMTokenSecretManager clientToAMTokenSecretManager;
     private TimelineClient timelineClient = null;
 
-    public RunningAppContext(Configuration config) {
+    private final TaskAttemptFinishingMonitor taskAttemptFinishingMonitor;
+
+    public RunningAppContext(Configuration config,
+        TaskAttemptFinishingMonitor taskAttemptFinishingMonitor) {
       this.conf = config;
       this.clientToAMTokenSecretManager =
           new ClientToAMTokenSecretManager(appAttemptID, null);
+      this.taskAttemptFinishingMonitor = taskAttemptFinishingMonitor;
       if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA,
               MRJobConfig.DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA)
             && conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
@@ -1072,7 +1088,12 @@ public class MRAppMaster extends CompositeService {
     public String getNMHostname() {
       return nmHost;
     }
-    
+
+    @Override
+    public TaskAttemptFinishingMonitor getTaskAttemptFinishingMonitor() {
+      return taskAttemptFinishingMonitor;
+    }
+
     // Get Timeline Collector's address (get sync from RM)
     public TimelineClient getTimelineClient() {
       return timelineClient;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5625ac46/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptFinishingMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptFinishingMonitor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptFinishingMonitor.java
new file mode 100644
index 0000000..f603398
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptFinishingMonitor.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor;
+import org.apache.hadoop.yarn.util.SystemClock;
+
+/**
+ * This class generates TA_TIMED_OUT if the task attempt stays in FINISHING
+ * state for too long.
+ */
+@SuppressWarnings({"unchecked", "rawtypes"})
+public class TaskAttemptFinishingMonitor extends
+    AbstractLivelinessMonitor<TaskAttemptId> {
+
+  private EventHandler eventHandler;
+
+  public TaskAttemptFinishingMonitor(EventHandler eventHandler) {
+    super("TaskAttemptFinishingMonitor", new SystemClock());
+    this.eventHandler = eventHandler;
+  }
+
+  public void init(Configuration conf) {
+    super.init(conf);
+    int expireIntvl = conf.getInt(MRJobConfig.TASK_EXIT_TIMEOUT,
+        MRJobConfig.TASK_EXIT_TIMEOUT_DEFAULT);
+    int checkIntvl = conf.getInt(
+        MRJobConfig.TASK_EXIT_TIMEOUT_CHECK_INTERVAL_MS,
+        MRJobConfig.TASK_EXIT_TIMEOUT_CHECK_INTERVAL_MS_DEFAULT);
+
+    setExpireInterval(expireIntvl);
+    setMonitorInterval(checkIntvl);
+  }
+
+  @Override
+  protected void expire(TaskAttemptId id) {
+    eventHandler.handle(
+        new TaskAttemptEvent(id,
+        TaskAttemptEventType.TA_TIMED_OUT));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5625ac46/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java
index ceb1dbf..d378b0a 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java
@@ -370,7 +370,7 @@ public class MRClientService extends AbstractService implements ClientService {
           new TaskAttemptDiagnosticsUpdateEvent(taskAttemptId, message));
       appContext.getEventHandler().handle(
           new TaskAttemptEvent(taskAttemptId, 
-              TaskAttemptEventType.TA_FAILMSG));
+              TaskAttemptEventType.TA_FAILMSG_BY_CLIENT));
       FailTaskAttemptResponse response = recordFactory.
         newRecordInstance(FailTaskAttemptResponse.class);
       return response;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5625ac46/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttemptStateInternal.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttemptStateInternal.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttemptStateInternal.java
index f6c3e57..5f17651 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttemptStateInternal.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttemptStateInternal.java
@@ -30,9 +30,42 @@ public enum TaskAttemptStateInternal {
   UNASSIGNED, 
   ASSIGNED, 
   RUNNING, 
-  COMMIT_PENDING, 
-  SUCCESS_CONTAINER_CLEANUP, 
-  SUCCEEDED, 
+  COMMIT_PENDING,
+
+  // Transition into SUCCESS_FINISHING_CONTAINER
+  // After the attempt finishes successfully from
+  // TaskUmbilicalProtocol's point of view, it will transition to
+  // SUCCESS_FINISHING_CONTAINER state. That will give a chance for the
+  // container to exit by itself. In the transition,
+  // the attempt will notify the task via T_ATTEMPT_SUCCEEDED so that
+  // from job point of view, the task is considered succeeded.
+
+  // Transition out of SUCCESS_FINISHING_CONTAINER
+  // The attempt will transition from SUCCESS_FINISHING_CONTAINER to
+  // SUCCESS_CONTAINER_CLEANUP if it doesn't receive container exit
+  // notification within TASK_EXIT_TIMEOUT;
+  // Or it will transition to SUCCEEDED if it receives container exit
+  // notification from YARN.
+  SUCCESS_FINISHING_CONTAINER,
+
+  // Transition into FAIL_FINISHING_CONTAINER
+  // After the attempt fails from
+  // TaskUmbilicalProtocol's point of view, it will transition to
+  // FAIL_FINISHING_CONTAINER state. That will give a chance for the container
+  // to exit by itself. In the transition,
+  // the attempt will notify the task via T_ATTEMPT_FAILED so that
+  // from job point of view, the task is considered failed.
+
+  // Transition out of FAIL_FINISHING_CONTAINER
+  // The attempt will transition from FAIL_FINISHING_CONTAINER to
+  // FAIL_CONTAINER_CLEANUP if it doesn't receive container exit
+  // notification within TASK_EXIT_TIMEOUT;
+  // Or it will transition to FAILED if it receives container exit
+  // notification from YARN.
+  FAIL_FINISHING_CONTAINER,
+
+  SUCCESS_CONTAINER_CLEANUP,
+  SUCCEEDED,
   FAIL_CONTAINER_CLEANUP, 
   FAIL_TASK_CLEANUP, 
   FAILED, 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5625ac46/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java
index 1f05ac3..61de032 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java
@@ -49,6 +49,9 @@ public enum TaskAttemptEventType {
   TA_TIMED_OUT,
   TA_PREEMPTED,
 
+  //Producer:Client
+  TA_FAILMSG_BY_CLIENT,
+
   //Producer:TaskCleaner
   TA_CLEANUP_DONE,
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5625ac46/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
index f4b434b..7e82df2 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
@@ -184,8 +184,20 @@ public abstract class TaskAttemptImpl implements
   private Locality locality;
   private Avataar avataar;
 
-  private static final CleanupContainerTransition CLEANUP_CONTAINER_TRANSITION =
-    new CleanupContainerTransition();
+  private static final CleanupContainerTransition
+      CLEANUP_CONTAINER_TRANSITION = new CleanupContainerTransition();
+  private static final MoveContainerToSucceededFinishingTransition
+      SUCCEEDED_FINISHING_TRANSITION =
+          new MoveContainerToSucceededFinishingTransition();
+  private static final MoveContainerToFailedFinishingTransition
+      FAILED_FINISHING_TRANSITION =
+          new MoveContainerToFailedFinishingTransition();
+  private static final ExitFinishingOnTimeoutTransition
+      FINISHING_ON_TIMEOUT_TRANSITION =
+          new ExitFinishingOnTimeoutTransition();
+
+  private static final FinalizeFailedTransition FINALIZE_FAILED_TRANSITION =
+      new FinalizeFailedTransition();
 
   private static final DiagnosticInformationUpdater 
     DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION 
@@ -204,6 +216,8 @@ public abstract class TaskAttemptImpl implements
       TaskAttemptEventType.TA_COMMIT_PENDING,
       TaskAttemptEventType.TA_DONE,
       TaskAttemptEventType.TA_FAILMSG,
+      TaskAttemptEventType.TA_FAILMSG_BY_CLIENT,
+      TaskAttemptEventType.TA_TIMED_OUT,
       TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE);
 
   private static final StateMachineFactory
@@ -221,16 +235,16 @@ public abstract class TaskAttemptImpl implements
      .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.KILLED,
          TaskAttemptEventType.TA_KILL, new KilledTransition())
      .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.FAILED,
-         TaskAttemptEventType.TA_FAILMSG, new FailedTransition())
+         TaskAttemptEventType.TA_FAILMSG_BY_CLIENT, new FailedTransition())
      .addTransition(TaskAttemptStateInternal.NEW,
          EnumSet.of(TaskAttemptStateInternal.FAILED,
              TaskAttemptStateInternal.KILLED,
              TaskAttemptStateInternal.SUCCEEDED),
          TaskAttemptEventType.TA_RECOVER, new RecoverTransition())
      .addTransition(TaskAttemptStateInternal.NEW,
-          TaskAttemptStateInternal.NEW,
-          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
-          DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+         TaskAttemptStateInternal.NEW,
+         TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+         DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
 
      // Transitions from the UNASSIGNED state.
      .addTransition(TaskAttemptStateInternal.UNASSIGNED,
@@ -238,14 +252,14 @@ public abstract class TaskAttemptImpl implements
          new ContainerAssignedTransition())
      .addTransition(TaskAttemptStateInternal.UNASSIGNED, TaskAttemptStateInternal.KILLED,
          TaskAttemptEventType.TA_KILL, new DeallocateContainerTransition(
-             TaskAttemptStateInternal.KILLED, true))
+         TaskAttemptStateInternal.KILLED, true))
      .addTransition(TaskAttemptStateInternal.UNASSIGNED, TaskAttemptStateInternal.FAILED,
-         TaskAttemptEventType.TA_FAILMSG, new DeallocateContainerTransition(
+         TaskAttemptEventType.TA_FAILMSG_BY_CLIENT, new DeallocateContainerTransition(
              TaskAttemptStateInternal.FAILED, true))
      .addTransition(TaskAttemptStateInternal.UNASSIGNED,
-          TaskAttemptStateInternal.UNASSIGNED,
-          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
-          DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+         TaskAttemptStateInternal.UNASSIGNED,
+         TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+         DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
 
      // Transitions from the ASSIGNED state.
      .addTransition(TaskAttemptStateInternal.ASSIGNED, TaskAttemptStateInternal.RUNNING,
@@ -258,15 +272,19 @@ public abstract class TaskAttemptImpl implements
          TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
          new DeallocateContainerTransition(TaskAttemptStateInternal.FAILED, false))
      .addTransition(TaskAttemptStateInternal.ASSIGNED,
-         TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
+         TaskAttemptStateInternal.FAILED,
          TaskAttemptEventType.TA_CONTAINER_COMPLETED,
-         CLEANUP_CONTAINER_TRANSITION)
+         FINALIZE_FAILED_TRANSITION)
      .addTransition(TaskAttemptStateInternal.ASSIGNED, 
          TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
          TaskAttemptEventType.TA_KILL, CLEANUP_CONTAINER_TRANSITION)
-     .addTransition(TaskAttemptStateInternal.ASSIGNED, 
+     .addTransition(TaskAttemptStateInternal.ASSIGNED,
+         TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER,
+         TaskAttemptEventType.TA_FAILMSG, FAILED_FINISHING_TRANSITION)
+     .addTransition(TaskAttemptStateInternal.ASSIGNED,
          TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
-         TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION)
+         TaskAttemptEventType.TA_FAILMSG_BY_CLIENT,
+             CLEANUP_CONTAINER_TRANSITION)
 
      // Transitions from RUNNING state.
      .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING,
@@ -274,23 +292,27 @@ public abstract class TaskAttemptImpl implements
      .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING,
          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
          DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
-     // If no commit is required, task directly goes to success
+     // If no commit is required, task goes to finishing state
+     // This will give a chance for the container to exit by itself
      .addTransition(TaskAttemptStateInternal.RUNNING,
-         TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
-         TaskAttemptEventType.TA_DONE, CLEANUP_CONTAINER_TRANSITION)
+         TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
+         TaskAttemptEventType.TA_DONE, SUCCEEDED_FINISHING_TRANSITION)
      // If commit is required, task goes through commit pending state.
      .addTransition(TaskAttemptStateInternal.RUNNING,
          TaskAttemptStateInternal.COMMIT_PENDING,
          TaskAttemptEventType.TA_COMMIT_PENDING, new CommitPendingTransition())
      // Failure handling while RUNNING
      .addTransition(TaskAttemptStateInternal.RUNNING,
+         TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER,
+         TaskAttemptEventType.TA_FAILMSG, FAILED_FINISHING_TRANSITION)
+     .addTransition(TaskAttemptStateInternal.RUNNING,
          TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
-         TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION)
+         TaskAttemptEventType.TA_FAILMSG_BY_CLIENT, CLEANUP_CONTAINER_TRANSITION)
       //for handling container exit without sending the done or fail msg
      .addTransition(TaskAttemptStateInternal.RUNNING,
-         TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
+         TaskAttemptStateInternal.FAILED,
          TaskAttemptEventType.TA_CONTAINER_COMPLETED,
-         CLEANUP_CONTAINER_TRANSITION)
+         FINALIZE_FAILED_TRANSITION)
      // Timeout handling while RUNNING
      .addTransition(TaskAttemptStateInternal.RUNNING,
          TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
@@ -301,12 +323,97 @@ public abstract class TaskAttemptImpl implements
          TaskAttemptEventType.TA_CONTAINER_CLEANED, new KilledTransition())
      // Kill handling
      .addTransition(TaskAttemptStateInternal.RUNNING,
-         TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL,
+         TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
+         TaskAttemptEventType.TA_KILL,
          CLEANUP_CONTAINER_TRANSITION)
      .addTransition(TaskAttemptStateInternal.RUNNING,
          TaskAttemptStateInternal.KILLED,
          TaskAttemptEventType.TA_PREEMPTED, new PreemptedTransition())
 
+     // Transitions from SUCCESS_FINISHING_CONTAINER state
+     // When the container exits by itself, the notification of container
+     // completed event will be routed via NM -> RM -> AM.
+     // After MRAppMaster gets notification from RM, it will generate
+     // TA_CONTAINER_COMPLETED event.
+     .addTransition(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
+         TaskAttemptStateInternal.SUCCEEDED,
+         TaskAttemptEventType.TA_CONTAINER_COMPLETED,
+         new ExitFinishingOnContainerCompletedTransition())
+     // Given TA notifies task T_ATTEMPT_SUCCEEDED when it transitions to
+     // SUCCESS_FINISHING_CONTAINER, it is possible to receive the event
+     // TA_CONTAINER_CLEANED in the following scenario.
+     // 1. It is the last task for the job.
+     // 2. After the task receives T_ATTEMPT_SUCCEEDED, it will notify job.
+     // 3. Job will be marked completed.
+     // 4. As part of MRAppMaster's shutdown, all containers will be killed.
+     .addTransition(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
+         TaskAttemptStateInternal.SUCCEEDED,
+         TaskAttemptEventType.TA_CONTAINER_CLEANED,
+         new ExitFinishingOnContainerCleanedupTransition())
+     // The client wants to kill the task. Given the task is in finishing
+     // state, it could go to succeeded state or killed state. If it is a
+     // reducer, it will go to succeeded state;
+     // otherwise, it goes to killed state.
+     .addTransition(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
+         EnumSet.of(TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
+             TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP),
+         TaskAttemptEventType.TA_KILL,
+         new KilledAfterSucceededFinishingTransition())
+     // The attempt stays in finishing state for too long
+     // Let us clean up the container
+     .addTransition(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
+         TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
+         TaskAttemptEventType.TA_TIMED_OUT, FINISHING_ON_TIMEOUT_TRANSITION)
+     .addTransition(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
+         TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
+         TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+         DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+     // ignore-able events
+     .addTransition(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
+         TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
+         EnumSet.of(TaskAttemptEventType.TA_UPDATE,
+             TaskAttemptEventType.TA_DONE,
+             TaskAttemptEventType.TA_COMMIT_PENDING,
+             TaskAttemptEventType.TA_FAILMSG,
+             TaskAttemptEventType.TA_FAILMSG_BY_CLIENT))
+
+     // Transitions from FAIL_FINISHING_CONTAINER state
+     // When the container exits by itself, the notification of container
+     // completed event will be routed via NM -> RM -> AM.
+     // After MRAppMaster gets notification from RM, it will generate
+     // TA_CONTAINER_COMPLETED event.
+    .addTransition(TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER,
+        TaskAttemptStateInternal.FAILED,
+        TaskAttemptEventType.TA_CONTAINER_COMPLETED,
+        new ExitFinishingOnContainerCompletedTransition())
+     // Given TA notifies task T_ATTEMPT_FAILED when it transitions to
+     // FAIL_FINISHING_CONTAINER, it is possible to receive the event
+     // TA_CONTAINER_CLEANED in the following scenario.
+     // 1. It is the last task attempt for the task.
+     // 2. After the task receives T_ATTEMPT_FAILED, it will notify job.
+     // 3. Job will be marked failed.
+     // 4. As part of MRAppMaster's shutdown, all containers will be killed.
+    .addTransition(TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER,
+        TaskAttemptStateInternal.FAILED,
+        TaskAttemptEventType.TA_CONTAINER_CLEANED,
+        new ExitFinishingOnContainerCleanedupTransition())
+    .addTransition(TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER,
+        TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
+        TaskAttemptEventType.TA_TIMED_OUT, FINISHING_ON_TIMEOUT_TRANSITION)
+    .addTransition(TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER,
+        TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER,
+        TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+        DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+        // ignore-able events
+    .addTransition(TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER,
+        TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER,
+        EnumSet.of(TaskAttemptEventType.TA_KILL,
+            TaskAttemptEventType.TA_UPDATE,
+            TaskAttemptEventType.TA_DONE,
+            TaskAttemptEventType.TA_COMMIT_PENDING,
+            TaskAttemptEventType.TA_FAILMSG,
+            TaskAttemptEventType.TA_FAILMSG_BY_CLIENT))
+
      // Transitions from COMMIT_PENDING state
      .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
          TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptEventType.TA_UPDATE,
@@ -316,22 +423,27 @@ public abstract class TaskAttemptImpl implements
          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
          DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
      .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
-         TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
-         TaskAttemptEventType.TA_DONE, CLEANUP_CONTAINER_TRANSITION)
+         TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
+         TaskAttemptEventType.TA_DONE, SUCCEEDED_FINISHING_TRANSITION)
      .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
-         TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL,
+         TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
+         TaskAttemptEventType.TA_KILL,
          CLEANUP_CONTAINER_TRANSITION)
      // if container killed by AM shutting down
      .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
          TaskAttemptStateInternal.KILLED,
          TaskAttemptEventType.TA_CONTAINER_CLEANED, new KilledTransition())
      .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
-         TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
-         TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION)
+         TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER,
+         TaskAttemptEventType.TA_FAILMSG, FAILED_FINISHING_TRANSITION)
      .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
          TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
+         TaskAttemptEventType.TA_FAILMSG_BY_CLIENT,
+             CLEANUP_CONTAINER_TRANSITION)
+     .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
+         TaskAttemptStateInternal.FAILED,
          TaskAttemptEventType.TA_CONTAINER_COMPLETED,
-         CLEANUP_CONTAINER_TRANSITION)
+         FINALIZE_FAILED_TRANSITION)
      .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
          TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
          TaskAttemptEventType.TA_TIMED_OUT, CLEANUP_CONTAINER_TRANSITION)
@@ -348,8 +460,8 @@ public abstract class TaskAttemptImpl implements
      // Transitions from SUCCESS_CONTAINER_CLEANUP state
      // kill and cleanup the container
      .addTransition(TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
-         TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_CONTAINER_CLEANED,
-         new SucceededTransition())
+         TaskAttemptStateInternal.SUCCEEDED,
+         TaskAttemptEventType.TA_CONTAINER_CLEANED)
      .addTransition(
           TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
           TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
@@ -360,6 +472,7 @@ public abstract class TaskAttemptImpl implements
          TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
          EnumSet.of(TaskAttemptEventType.TA_KILL,
              TaskAttemptEventType.TA_FAILMSG,
+             TaskAttemptEventType.TA_FAILMSG_BY_CLIENT,
              TaskAttemptEventType.TA_TIMED_OUT,
              TaskAttemptEventType.TA_CONTAINER_COMPLETED))
 
@@ -383,6 +496,7 @@ public abstract class TaskAttemptImpl implements
              TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
              TaskAttemptEventType.TA_DONE,
              TaskAttemptEventType.TA_FAILMSG,
+             TaskAttemptEventType.TA_FAILMSG_BY_CLIENT,
              TaskAttemptEventType.TA_TIMED_OUT))
 
       // Transitions from KILL_CONTAINER_CLEANUP
@@ -405,6 +519,7 @@ public abstract class TaskAttemptImpl implements
              TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
              TaskAttemptEventType.TA_DONE,
              TaskAttemptEventType.TA_FAILMSG,
+             TaskAttemptEventType.TA_FAILMSG_BY_CLIENT,
              TaskAttemptEventType.TA_TIMED_OUT))
 
      // Transitions from FAIL_TASK_CLEANUP
@@ -425,6 +540,7 @@ public abstract class TaskAttemptImpl implements
              TaskAttemptEventType.TA_COMMIT_PENDING,
              TaskAttemptEventType.TA_DONE,
              TaskAttemptEventType.TA_FAILMSG,
+             TaskAttemptEventType.TA_FAILMSG_BY_CLIENT,
              TaskAttemptEventType.TA_CONTAINER_CLEANED,
              // Container launch events can arrive late
              TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
@@ -447,6 +563,7 @@ public abstract class TaskAttemptImpl implements
              TaskAttemptEventType.TA_COMMIT_PENDING,
              TaskAttemptEventType.TA_DONE,
              TaskAttemptEventType.TA_FAILMSG,
+             TaskAttemptEventType.TA_FAILMSG_BY_CLIENT,
              TaskAttemptEventType.TA_CONTAINER_CLEANED,
              TaskAttemptEventType.TA_PREEMPTED,
              // Container launch events can arrive late
@@ -460,7 +577,7 @@ public abstract class TaskAttemptImpl implements
          new TooManyFetchFailureTransition())
       .addTransition(TaskAttemptStateInternal.SUCCEEDED,
           EnumSet.of(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.KILLED),
-          TaskAttemptEventType.TA_KILL, 
+          TaskAttemptEventType.TA_KILL,
           new KilledAfterSuccessTransition())
      .addTransition(
          TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.SUCCEEDED,
@@ -470,6 +587,10 @@ public abstract class TaskAttemptImpl implements
      .addTransition(TaskAttemptStateInternal.SUCCEEDED,
          TaskAttemptStateInternal.SUCCEEDED,
          EnumSet.of(TaskAttemptEventType.TA_FAILMSG,
+             TaskAttemptEventType.TA_FAILMSG_BY_CLIENT,
+             // TaskAttemptFinishingMonitor might time out the attempt right
+             // after the attempt receives TA_CONTAINER_COMPLETED.
+             TaskAttemptEventType.TA_TIMED_OUT,
              TaskAttemptEventType.TA_CONTAINER_CLEANED,
              TaskAttemptEventType.TA_CONTAINER_COMPLETED))
 
@@ -1213,21 +1334,21 @@ public abstract class TaskAttemptImpl implements
       return TaskAttemptState.STARTING;
     case COMMIT_PENDING:
       return TaskAttemptState.COMMIT_PENDING;
-    case FAILED:
-      return TaskAttemptState.FAILED;
-    case KILLED:
-      return TaskAttemptState.KILLED;
-      // All CLEANUP states considered as RUNNING since events have not gone out
-      // to the Task yet. May be possible to consider them as a Finished state.
     case FAIL_CONTAINER_CLEANUP:
     case FAIL_TASK_CLEANUP:
+    case FAIL_FINISHING_CONTAINER:
+    case FAILED:
+      return TaskAttemptState.FAILED;
     case KILL_CONTAINER_CLEANUP:
     case KILL_TASK_CLEANUP:
-    case SUCCESS_CONTAINER_CLEANUP:
+    case KILLED:
+      return TaskAttemptState.KILLED;
     case RUNNING:
       return TaskAttemptState.RUNNING;
     case NEW:
       return TaskAttemptState.NEW;
+    case SUCCESS_CONTAINER_CLEANUP:
+    case SUCCESS_FINISHING_CONTAINER:
     case SUCCEEDED:
       return TaskAttemptState.SUCCEEDED;
     default:
@@ -1429,6 +1550,15 @@ public abstract class TaskAttemptImpl implements
     }
   }
 
+  private static void finalizeProgress(TaskAttemptImpl taskAttempt) {
+    // unregister it to TaskAttemptListener so that it stops listening
+    taskAttempt.taskAttemptListener.unregister(
+        taskAttempt.attemptId, taskAttempt.jvmID);
+    taskAttempt.reportedStatus.progress = 1.0f;
+    taskAttempt.updateProgressSplits();
+  }
+
+
   static class RequestContainerTransition implements
       SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
     private final boolean rescheduled;
@@ -1661,53 +1791,66 @@ public abstract class TaskAttemptImpl implements
     }
   }
 
-  private static class SucceededTransition implements
+  /**
+   * Transition from SUCCESS_FINISHING_CONTAINER or FAIL_FINISHING_CONTAINER
+   * state upon receiving TA_CONTAINER_COMPLETED event
+   */
+  private static class ExitFinishingOnContainerCompletedTransition implements
       SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
     @SuppressWarnings("unchecked")
     @Override
-    public void transition(TaskAttemptImpl taskAttempt, 
+    public void transition(TaskAttemptImpl taskAttempt,
+       TaskAttemptEvent event) {
+      taskAttempt.appContext.getTaskAttemptFinishingMonitor().unregister(
+          taskAttempt.attemptId);
+      sendContainerCompleted(taskAttempt);
+    }
+  }
+
+  private static class ExitFinishingOnContainerCleanedupTransition implements
+      SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+    @SuppressWarnings("unchecked")
+    @Override
+    public void transition(TaskAttemptImpl taskAttempt,
         TaskAttemptEvent event) {
-      //set the finish time
-      taskAttempt.setFinishTime();
-      taskAttempt.eventHandler.handle(
-          createJobCounterUpdateEventTASucceeded(taskAttempt));
-      taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.SUCCEEDED);
-      taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
-          taskAttempt.attemptId,
-          TaskEventType.T_ATTEMPT_SUCCEEDED));
-      taskAttempt.eventHandler.handle
-      (new SpeculatorEvent
-          (taskAttempt.reportedStatus, taskAttempt.clock.getTime()));
-   }
+      taskAttempt.appContext.getTaskAttemptFinishingMonitor().unregister(
+          taskAttempt.attemptId);
+    }
   }
 
   private static class FailedTransition implements
       SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
     @SuppressWarnings("unchecked")
     @Override
-    public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) {
+    public void transition(TaskAttemptImpl taskAttempt,
+        TaskAttemptEvent event) {
       // set the finish time
       taskAttempt.setFinishTime();
-      
-      if (taskAttempt.getLaunchTime() != 0) {
-        taskAttempt.eventHandler
-            .handle(createJobCounterUpdateEventTAFailed(taskAttempt, false));
-        TaskAttemptUnsuccessfulCompletionEvent tauce =
-            createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
-                TaskAttemptStateInternal.FAILED);
-        taskAttempt.eventHandler.handle(new JobHistoryEvent(
-            taskAttempt.attemptId.getTaskId().getJobId(), tauce));
-        // taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.FAILED); Not
-        // handling failed map/reduce events.
-      }else {
-        LOG.debug("Not generating HistoryFinish event since start event not " +
-            "generated for taskAttempt: " + taskAttempt.getID());
-      }
-      taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
-          taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED));
+      notifyTaskAttemptFailed(taskAttempt);
     }
   }
 
+  private static class FinalizeFailedTransition extends FailedTransition {
+    @SuppressWarnings("unchecked")
+    @Override
+    public void transition(TaskAttemptImpl taskAttempt,
+        TaskAttemptEvent event) {
+      finalizeProgress(taskAttempt);
+      sendContainerCompleted(taskAttempt);
+      super.transition(taskAttempt, event);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private static void sendContainerCompleted(TaskAttemptImpl taskAttempt) {
+    taskAttempt.eventHandler.handle(new ContainerLauncherEvent(
+        taskAttempt.attemptId,
+        taskAttempt.container.getId(), StringInterner
+        .weakIntern(taskAttempt.container.getNodeId().toString()),
+        taskAttempt.container.getContainerToken(),
+        ContainerLauncher.EventType.CONTAINER_COMPLETED));
+  }
+
   private static class RecoverTransition implements
       MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, TaskAttemptStateInternal> {
 
@@ -1832,6 +1975,35 @@ public abstract class TaskAttemptImpl implements
     }
   }
 
+  private static class KilledAfterSucceededFinishingTransition
+      implements MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent,
+      TaskAttemptStateInternal> {
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public TaskAttemptStateInternal transition(TaskAttemptImpl taskAttempt,
+        TaskAttemptEvent event) {
+      taskAttempt.appContext.getTaskAttemptFinishingMonitor().unregister(
+          taskAttempt.attemptId);
+      sendContainerCleanup(taskAttempt, event);
+      if(taskAttempt.getID().getTaskId().getTaskType() == TaskType.REDUCE) {
+        // after a reduce task has succeeded, its outputs are in safe in HDFS.
+        // logically such a task should not be killed. we only come here when
+        // there is a race condition in the event queue. E.g. some logic sends
+        // a kill request to this attempt when the successful completion event
+        // for this task is already in the event queue. so the kill event will
+        // get executed immediately after the attempt is marked successful and
+        // result in this transition being exercised.
+        // ignore this for reduce tasks
+        LOG.info("Ignoring killed event for successful reduce task attempt" +
+            taskAttempt.getID().toString());
+        return TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP;
+      } else {
+        return TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP;
+      }
+    }
+  }
+
   private static class KilledTransition implements
       SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
 
@@ -1887,6 +2059,31 @@ public abstract class TaskAttemptImpl implements
     }
   }
 
+  /**
+   * Transition from SUCCESS_FINISHING_CONTAINER or FAIL_FINISHING_CONTAINER
+   * state upon receiving TA_TIMED_OUT event
+   */
+  private static class ExitFinishingOnTimeoutTransition implements
+      SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+    @SuppressWarnings("unchecked")
+    @Override
+    public void transition(TaskAttemptImpl taskAttempt,
+        TaskAttemptEvent event) {
+      taskAttempt.appContext.getTaskAttemptFinishingMonitor().unregister(
+          taskAttempt.attemptId);
+      // The attempt stays in finishing state for too long
+      String msg = "Task attempt " + taskAttempt.getID() + " is done from " +
+          "TaskUmbilicalProtocol's point of view. However, it stays in " +
+          "finishing state for too long";
+      LOG.warn(msg);
+      taskAttempt.addDiagnosticInfo(msg);
+      sendContainerCleanup(taskAttempt, event);
+    }
+  }
+
+  /**
+   * Finish and clean up the container
+   */
   private static class CleanupContainerTransition implements
        SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
     @SuppressWarnings("unchecked")
@@ -1894,27 +2091,103 @@ public abstract class TaskAttemptImpl implements
     public void transition(TaskAttemptImpl taskAttempt, 
         TaskAttemptEvent event) {
       // unregister it to TaskAttemptListener so that it stops listening
-      // for it
-      taskAttempt.taskAttemptListener.unregister(
-          taskAttempt.attemptId, taskAttempt.jvmID);
+      // for it.
+      finalizeProgress(taskAttempt);
+      sendContainerCleanup(taskAttempt, event);
+    }
+  }
 
-      if (event instanceof TaskAttemptKillEvent) {
-        taskAttempt.addDiagnosticInfo(
-            ((TaskAttemptKillEvent) event).getMessage());
-      }
+  @SuppressWarnings("unchecked")
+  private static void sendContainerCleanup(TaskAttemptImpl taskAttempt,
+      TaskAttemptEvent event) {
+    if (event instanceof TaskAttemptKillEvent) {
+      taskAttempt.addDiagnosticInfo(
+          ((TaskAttemptKillEvent) event).getMessage());
+    }
+    //send the cleanup event to containerLauncher
+    taskAttempt.eventHandler.handle(new ContainerLauncherEvent(
+        taskAttempt.attemptId,
+        taskAttempt.container.getId(), StringInterner
+        .weakIntern(taskAttempt.container.getNodeId().toString()),
+        taskAttempt.container.getContainerToken(),
+        ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP));
+  }
+
+  /**
+   * Transition to SUCCESS_FINISHING_CONTAINER upon receiving TA_DONE event
+   */
+  private static class MoveContainerToSucceededFinishingTransition implements
+      SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+    @SuppressWarnings("unchecked")
+    @Override
+    public void transition(TaskAttemptImpl taskAttempt,
+        TaskAttemptEvent event) {
+      finalizeProgress(taskAttempt);
+
+      // register it to finishing state
+      taskAttempt.appContext.getTaskAttemptFinishingMonitor().register(
+          taskAttempt.attemptId);
+
+      // set the finish time
+      taskAttempt.setFinishTime();
+
+      // notify job history
+      taskAttempt.eventHandler.handle(
+          createJobCounterUpdateEventTASucceeded(taskAttempt));
+      taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.SUCCEEDED);
+
+      //notify the task even though the container might not have exited yet.
+      taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
+          taskAttempt.attemptId,
+          TaskEventType.T_ATTEMPT_SUCCEEDED));
+      taskAttempt.eventHandler.handle
+          (new SpeculatorEvent
+              (taskAttempt.reportedStatus, taskAttempt.clock.getTime()));
 
-      taskAttempt.reportedStatus.progress = 1.0f;
-      taskAttempt.updateProgressSplits();
-      //send the cleanup event to containerLauncher
-      taskAttempt.eventHandler.handle(new ContainerLauncherEvent(
-          taskAttempt.attemptId, 
-          taskAttempt.container.getId(), StringInterner
-              .weakIntern(taskAttempt.container.getNodeId().toString()),
-          taskAttempt.container.getContainerToken(),
-          ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP));
     }
   }
 
+  /**
+   * Transition to FAIL_FINISHING_CONTAINER upon receiving TA_FAILMSG event
+   */
+  private static class MoveContainerToFailedFinishingTransition implements
+      SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+    @SuppressWarnings("unchecked")
+    @Override
+    public void transition(TaskAttemptImpl taskAttempt,
+        TaskAttemptEvent event) {
+      finalizeProgress(taskAttempt);
+      // register it to finishing state
+      taskAttempt.appContext.getTaskAttemptFinishingMonitor().register(
+          taskAttempt.attemptId);
+      notifyTaskAttemptFailed(taskAttempt);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private static void notifyTaskAttemptFailed(TaskAttemptImpl taskAttempt) {
+    // set the finish time
+    taskAttempt.setFinishTime();
+
+    if (taskAttempt.getLaunchTime() != 0) {
+      taskAttempt.eventHandler
+          .handle(createJobCounterUpdateEventTAFailed(taskAttempt, false));
+      TaskAttemptUnsuccessfulCompletionEvent tauce =
+          createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
+              TaskAttemptStateInternal.FAILED);
+      taskAttempt.eventHandler.handle(new JobHistoryEvent(
+          taskAttempt.attemptId.getTaskId().getJobId(), tauce));
+      // taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.FAILED); Not
+      // handling failed map/reduce events.
+    }else {
+      LOG.debug("Not generating HistoryFinish event since start event not " +
+          "generated for taskAttempt: " + taskAttempt.getID());
+    }
+    taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
+        taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED));
+
+  }
+
   private void addDiagnosticInfo(String diag) {
     if (diag != null && !diag.equals("")) {
       diagnostics.add(diag);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5625ac46/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java
index 40ecdb2..82360f0 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java
@@ -27,7 +27,13 @@ public interface ContainerLauncher
 
   enum EventType {
     CONTAINER_REMOTE_LAUNCH,
-    CONTAINER_REMOTE_CLEANUP
+    CONTAINER_REMOTE_CLEANUP,
+    // When TaskAttempt receives TA_CONTAINER_COMPLETED,
+    // it will notify ContainerLauncher so that the container can be removed
+    // from ContainerLauncher's launched containers list
+    // Otherwise, ContainerLauncher will try to stop the containers as part of
+    // serviceStop.
+    CONTAINER_COMPLETED
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5625ac46/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
index 9c1125d..a7e966c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
@@ -121,7 +121,11 @@ public class ContainerLauncherImpl extends AbstractService implements
     public synchronized boolean isCompletelyDone() {
       return state == ContainerState.DONE || state == ContainerState.FAILED;
     }
-    
+
+    public synchronized void done() {
+      state = ContainerState.DONE;
+    }
+
     @SuppressWarnings("unchecked")
     public synchronized void launch(ContainerRemoteLaunchEvent event) {
       LOG.info("Launching " + taskAttemptID);
@@ -378,6 +382,11 @@ public class ContainerLauncherImpl extends AbstractService implements
       case CONTAINER_REMOTE_CLEANUP:
         c.kill();
         break;
+
+      case CONTAINER_COMPLETED:
+        c.done();
+        break;
+
       }
       removeContainerIfDone(containerID);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5625ac46/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptFinishingMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptFinishingMonitor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptFinishingMonitor.java
new file mode 100644
index 0000000..800f0e2
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptFinishingMonitor.java
@@ -0,0 +1,108 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.mapred;
+
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.TaskAttemptFinishingMonitor;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.CheckpointAMPreemptionPolicy;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.SystemClock;
+
+import org.junit.Test;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestTaskAttemptFinishingMonitor {
+
+  @Test
+  public void testFinshingAttemptTimeout()
+      throws IOException, InterruptedException {
+    SystemClock clock = new SystemClock();
+    Configuration conf = new Configuration();
+    conf.setInt(MRJobConfig.TASK_EXIT_TIMEOUT, 100);
+    conf.setInt(MRJobConfig.TASK_EXIT_TIMEOUT_CHECK_INTERVAL_MS, 10);
+
+    AppContext appCtx = mock(AppContext.class);
+    JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
+    RMHeartbeatHandler rmHeartbeatHandler =
+        mock(RMHeartbeatHandler.class);
+    MockEventHandler eventHandler = new MockEventHandler();
+    TaskAttemptFinishingMonitor taskAttemptFinishingMonitor =
+        new TaskAttemptFinishingMonitor(eventHandler);
+    taskAttemptFinishingMonitor.init(conf);
+    taskAttemptFinishingMonitor.start();
+
+    when(appCtx.getEventHandler()).thenReturn(eventHandler);
+    when(appCtx.getNMHostname()).thenReturn("0.0.0.0");
+    when(appCtx.getTaskAttemptFinishingMonitor()).thenReturn(
+        taskAttemptFinishingMonitor);
+    when(appCtx.getClock()).thenReturn(clock);
+
+    CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
+    policy.init(appCtx);
+    TaskAttemptListenerImpl listener =
+        new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler, policy);
+
+    listener.init(conf);
+    listener.start();
+
+    JobId jid = MRBuilderUtils.newJobId(12345, 1, 1);
+    TaskId tid = MRBuilderUtils.newTaskId(jid, 0,
+        org.apache.hadoop.mapreduce.v2.api.records.TaskType.MAP);
+    TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(tid, 0);
+    appCtx.getTaskAttemptFinishingMonitor().register(attemptId);
+    int check = 0;
+    while ( !eventHandler.timedOut &&  check++ < 10 ) {
+      Thread.sleep(100);
+    }
+    taskAttemptFinishingMonitor.stop();
+
+    assertTrue("Finishing attempt didn't time out.", eventHandler.timedOut);
+
+  }
+
+  public static class MockEventHandler implements EventHandler {
+    public boolean timedOut = false;
+
+    @Override
+    public void handle(Event event) {
+      if (event instanceof TaskAttemptEvent) {
+        TaskAttemptEvent attemptEvent = ((TaskAttemptEvent) event);
+        if (TaskAttemptEventType.TA_TIMED_OUT == attemptEvent.getType()) {
+          timedOut = true;
+        }
+      }
+    }
+  };
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5625ac46/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
index 58db925..4fe4c44 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
@@ -482,6 +482,20 @@ public class MRApp extends MRAppMaster {
   }
 
   @Override
+  protected TaskAttemptFinishingMonitor
+      createTaskAttemptFinishingMonitor(
+      EventHandler eventHandler) {
+    return new TaskAttemptFinishingMonitor(eventHandler) {
+      @Override
+      public synchronized void register(TaskAttemptId attemptID) {
+        getContext().getEventHandler().handle(
+            new TaskAttemptEvent(attemptID,
+                TaskAttemptEventType.TA_CONTAINER_COMPLETED));
+      }
+    };
+  }
+
+  @Override
   protected TaskAttemptListener createTaskAttemptListener(
       AppContext context, AMPreemptionPolicy policy) {
     return new TaskAttemptListener(){
@@ -541,6 +555,8 @@ public class MRApp extends MRAppMaster {
             new TaskAttemptEvent(event.getTaskAttemptID(),
                 TaskAttemptEventType.TA_CONTAINER_CLEANED));
         break;
+      case CONTAINER_COMPLETED:
+        break;
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5625ac46/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java
index a900241..e690f3f 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java
@@ -148,4 +148,10 @@ public class MockAppContext implements AppContext {
     // bogus - Not Required
     return null;
   }
+
+  @Override
+  public TaskAttemptFinishingMonitor getTaskAttemptFinishingMonitor() {
+      return null;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5625ac46/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
index 4a36938..4d3f6f4 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
@@ -223,6 +223,8 @@ public class TestFail {
                 new TaskAttemptEvent(event.getTaskAttemptID(),
                     TaskAttemptEventType.TA_CONTAINER_CLEANED));
             break;
+          case CONTAINER_COMPLETED:
+            super.handle(event);
           }
         }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5625ac46/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java
index c33bd4d..aae591e 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java
@@ -159,7 +159,7 @@ public class TestKill {
               super.dispatch(new TaskAttemptEvent(taID,
                 TaskAttemptEventType.TA_DONE));
               super.dispatch(new TaskAttemptEvent(taID,
-                TaskAttemptEventType.TA_CONTAINER_CLEANED));
+                TaskAttemptEventType.TA_CONTAINER_COMPLETED));
               super.dispatch(new TaskTAttemptEvent(taID,
                 TaskEventType.T_ATTEMPT_SUCCEEDED));
               this.cachedKillEvent = killEvent;
@@ -211,40 +211,9 @@ public class TestKill {
     app.getContext().getEventHandler()
       .handle(new JobEvent(jobId, JobEventType.JOB_KILL));
 
-    app.waitForInternalState((JobImpl)job, JobStateInternal.KILLED);
-  }
-
-  static class MyAsyncDispatch extends AsyncDispatcher {
-    private CountDownLatch latch;
-    private TaskAttemptEventType attemptEventTypeToWait;
-    MyAsyncDispatch(CountDownLatch latch, TaskAttemptEventType attemptEventTypeToWait) {
-      super();
-      this.latch = latch;
-      this.attemptEventTypeToWait = attemptEventTypeToWait;
-    }
-
-    @Override
-    protected void dispatch(Event event) {
-      if (event instanceof TaskAttemptEvent) {
-        TaskAttemptEvent attemptEvent = (TaskAttemptEvent) event;
-        TaskAttemptId attemptID = ((TaskAttemptEvent) event).getTaskAttemptID();
-        if (attemptEvent.getType() == this.attemptEventTypeToWait
-            && attemptID.getTaskId().getId() == 0 && attemptID.getId() == 0 ) {
-          try {
-            latch.await();
-          } catch (InterruptedException e) {
-            e.printStackTrace();
-          }
-        }
-      }
-      super.dispatch(event);
-    }
+    app.waitForInternalState((JobImpl) job, JobStateInternal.KILLED);
   }
 
-  // This is to test a race condition where JobEventType.JOB_KILL is generated
-  // right after TaskAttemptEventType.TA_DONE is generated.
-  // TaskImpl's state machine might receive both T_ATTEMPT_SUCCEEDED
-  // and T_ATTEMPT_KILLED from the same attempt.
   @Test
   public void testKillTaskWaitKillJobAfterTA_DONE() throws Exception {
     CountDownLatch latch = new CountDownLatch(1);
@@ -269,15 +238,12 @@ public class TestKill {
     TaskAttempt reduceAttempt = reduceTask.getAttempts().values().iterator().next();
     app.waitForState(reduceAttempt, TaskAttemptState.RUNNING);
 
-    // The order in the dispatch event queue, from the oldest to the newest
+    // The order in the dispatch event queue, from first to last
     // TA_DONE
-    // JOB_KILL
-    // CONTAINER_REMOTE_CLEANUP ( from TA_DONE's handling )
-    // T_KILL ( from JOB_KILL's handling )
-    // TA_CONTAINER_CLEANED ( from CONTAINER_REMOTE_CLEANUP's handling )
-    // TA_KILL ( from T_KILL's handling )
-    // T_ATTEMPT_SUCCEEDED ( from TA_CONTAINER_CLEANED's handling )
-    // T_ATTEMPT_KILLED ( from TA_KILL's handling )
+    // JobEventType.JOB_KILL
+    // TaskAttemptEventType.TA_CONTAINER_COMPLETED ( from TA_DONE handling )
+    // TaskEventType.T_KILL ( from JobEventType.JOB_KILL handling )
+    // TaskEventType.T_ATTEMPT_SUCCEEDED ( from TA_CONTAINER_COMPLETED handling )
 
     // Finish map
     app.getContext().getEventHandler().handle(
@@ -295,6 +261,100 @@ public class TestKill {
     app.waitForInternalState((JobImpl)job, JobStateInternal.KILLED);
   }
 
+
+  @Test
+  public void testKillTaskWaitKillJobBeforeTA_DONE() throws Exception {
+    CountDownLatch latch = new CountDownLatch(1);
+    final Dispatcher dispatcher = new MyAsyncDispatch(latch, JobEventType.JOB_KILL);
+    MRApp app = new MRApp(1, 1, false, this.getClass().getName(), true) {
+      @Override
+      public Dispatcher createDispatcher() {
+        return dispatcher;
+      }
+    };
+    Job job = app.submit(new Configuration());
+    JobId jobId = app.getJobId();
+    app.waitForState(job, JobState.RUNNING);
+    Assert.assertEquals("Num tasks not correct", 2, job.getTasks().size());
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task mapTask = it.next();
+    Task reduceTask = it.next();
+    app.waitForState(mapTask, TaskState.RUNNING);
+    app.waitForState(reduceTask, TaskState.RUNNING);
+    TaskAttempt mapAttempt = mapTask.getAttempts().values().iterator().next();
+    app.waitForState(mapAttempt, TaskAttemptState.RUNNING);
+    TaskAttempt reduceAttempt = reduceTask.getAttempts().values().iterator().next();
+    app.waitForState(reduceAttempt, TaskAttemptState.RUNNING);
+
+    // The order in the dispatch event queue, from first to last
+    // JobEventType.JOB_KILL
+    // TA_DONE
+    // TaskEventType.T_KILL ( from JobEventType.JOB_KILL handling )
+    // TaskAttemptEventType.TA_CONTAINER_COMPLETED ( from TA_DONE handling )
+    // TaskAttemptEventType.TA_KILL ( from TaskEventType.T_KILL handling )
+    // TaskEventType.T_ATTEMPT_SUCCEEDED ( from TA_CONTAINER_COMPLETED handling )
+    // TaskEventType.T_ATTEMPT_KILLED ( from TA_KILL handling )
+
+    // Now kill the job
+    app.getContext().getEventHandler()
+        .handle(new JobEvent(jobId, JobEventType.JOB_KILL));
+
+    // Finish map
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            mapAttempt.getID(),
+            TaskAttemptEventType.TA_DONE));
+
+    //unblock
+    latch.countDown();
+
+    app.waitForInternalState((JobImpl)job, JobStateInternal.KILLED);
+  }
+
+  static class MyAsyncDispatch extends AsyncDispatcher {
+    private CountDownLatch latch;
+    private TaskAttemptEventType attemptEventTypeToWait;
+    private JobEventType jobEventTypeToWait;
+    MyAsyncDispatch(CountDownLatch latch, TaskAttemptEventType attemptEventTypeToWait) {
+      super();
+      this.latch = latch;
+      this.attemptEventTypeToWait = attemptEventTypeToWait;
+    }
+
+    MyAsyncDispatch(CountDownLatch latch, JobEventType jobEventTypeToWait) {
+      super();
+      this.latch = latch;
+      this.jobEventTypeToWait = jobEventTypeToWait;
+    }
+
+    @Override
+    protected void dispatch(Event event) {
+      if (event instanceof TaskAttemptEvent) {
+        TaskAttemptEvent attemptEvent = (TaskAttemptEvent) event;
+        TaskAttemptId attemptID = ((TaskAttemptEvent) event).getTaskAttemptID();
+        if (attemptEvent.getType() == this.attemptEventTypeToWait
+            && attemptID.getTaskId().getId() == 0 && attemptID.getId() == 0 ) {
+          try {
+            latch.await();
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+          }
+        }
+      } else if ( event instanceof JobEvent) {
+        JobEvent jobEvent = (JobEvent) event;
+        if (jobEvent.getType() == this.jobEventTypeToWait) {
+          try {
+            latch.await();
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+          }
+        }
+      }
+
+      super.dispatch(event);
+    }
+  }
+
   @Test
   public void testKillTaskAttempt() throws Exception {
     final CountDownLatch latch = new CountDownLatch(1);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5625ac46/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
index 69f2709..475cd1f 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
@@ -884,5 +884,10 @@ public class TestRuntimeEstimators {
       // bogus - Not Required
       return null;
     }
+
+    @Override
+    public TaskAttemptFinishingMonitor getTaskAttemptFinishingMonitor() {
+      return null;
+    }
   }
 }