You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/28 11:01:33 UTC

tez git commit: TEZ-2495. Inform TaskCommunicaor about Task and Container termination reasons. (sseth)

Repository: tez
Updated Branches:
  refs/heads/TEZ-2003 ecbfc8d92 -> 1cca40047


TEZ-2495. Inform TaskCommunicaor about Task and Container termination reasons. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/1cca4004
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/1cca4004
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/1cca4004

Branch: refs/heads/TEZ-2003
Commit: 1cca40047cb4d966284064d1f7da0e0313819923
Parents: ecbfc8d
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu May 28 02:01:04 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu May 28 02:01:04 2015 -0700

----------------------------------------------------------------------
 TEZ-2003-CHANGES.txt                            |  1 +
 .../org/apache/tez/common/TezUtilsInternal.java | 31 ++++++++-----
 .../apache/tez/dag/api/ContainerEndReason.java  | 27 +++++++++++
 .../tez/dag/api/TaskAttemptEndReason.java       | 13 +++---
 .../apache/tez/dag/api/TaskCommunicator.java    | 11 +++--
 .../apache/tez/dag/app/TaskAttemptListener.java |  6 ++-
 .../dag/app/TaskAttemptListenerImpTezDag.java   |  9 ++--
 .../tez/dag/app/TezTaskCommunicatorImpl.java    |  6 ++-
 .../rm/container/AMContainerEventCompleted.java | 41 +++++++++++++++++
 .../dag/app/rm/container/AMContainerImpl.java   | 35 ++++++++-------
 .../app/TestTaskAttemptListenerImplTezDag.java  |  8 ++--
 .../dag/app/rm/container/TestAMContainer.java   | 47 +++++++++++---------
 .../TezTestServiceTaskCommunicatorImpl.java     |  9 ++--
 .../apache/tez/runtime/task/TezTaskRunner2.java | 20 +++++++--
 14 files changed, 186 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/1cca4004/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index d651960..e333832 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -28,5 +28,6 @@ ALL CHANGES:
   TEZ-2443. TaskRunner2 should call abort, NPEs while cleaning up tasks.
   TEZ-2465. Retrun the status of a kill request in TaskRunner2.
   TEZ-2471. NPE in LogicalIOProcessorRuntimeTask while printing thread info.
+  TEZ-2495. Inform TaskCommunicaor about Task and Container termination reasons.
 
 INCOMPATIBLE CHANGES:

http://git-wip-us.apache.org/repos/asf/tez/blob/1cca4004/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
index 347a4f6..0bdeb79 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
@@ -246,10 +246,16 @@ public class TezUtilsInternal {
         return TaskAttemptTerminationCause.COMMUNICATION_ERROR;
       case SERVICE_BUSY:
         return TaskAttemptTerminationCause.SERVICE_BUSY;
-      case INTERRUPTED_BY_SYSTEM:
-        return TaskAttemptTerminationCause.INTERRUPTED_BY_SYSTEM;
-      case INTERRUPTED_BY_USER:
-        return TaskAttemptTerminationCause.INTERRUPTED_BY_USER;
+      case INTERNAL_PREEMPTION:
+        return TaskAttemptTerminationCause.INTERNAL_PREEMPTION;
+      case EXTERNAL_PREEMPTION:
+        return TaskAttemptTerminationCause.EXTERNAL_PREEMPTION;
+      case APPLICATION_ERROR:
+        return TaskAttemptTerminationCause.APPLICATION_ERROR;
+      case FRAMEWORK_ERROR:
+        return TaskAttemptTerminationCause.FRAMEWORK_ERROR;
+      case NODE_FAILED:
+        return TaskAttemptTerminationCause.NODE_FAILED;
       case OTHER:
         return TaskAttemptTerminationCause.UNKNOWN_ERROR;
       default:
@@ -267,20 +273,24 @@ public class TezUtilsInternal {
         return TaskAttemptEndReason.COMMUNICATION_ERROR;
       case SERVICE_BUSY:
         return TaskAttemptEndReason.SERVICE_BUSY;
+      case INTERNAL_PREEMPTION:
+        return TaskAttemptEndReason.INTERNAL_PREEMPTION;
+      case EXTERNAL_PREEMPTION:
+        return TaskAttemptEndReason.EXTERNAL_PREEMPTION;
+      case APPLICATION_ERROR:
+        return TaskAttemptEndReason.APPLICATION_ERROR;
+      case FRAMEWORK_ERROR:
+        return TaskAttemptEndReason.FRAMEWORK_ERROR;
+      case NODE_FAILED:
+        return TaskAttemptEndReason.NODE_FAILED;
       case INTERRUPTED_BY_SYSTEM:
-        return TaskAttemptEndReason.INTERRUPTED_BY_SYSTEM;
       case INTERRUPTED_BY_USER:
-        return TaskAttemptEndReason.INTERRUPTED_BY_USER;
       case UNKNOWN_ERROR:
       case TERMINATED_BY_CLIENT:
       case TERMINATED_AT_SHUTDOWN:
-      case INTERNAL_PREEMPTION:
-      case EXTERNAL_PREEMPTION:
       case TERMINATED_INEFFECTIVE_SPECULATION:
       case TERMINATED_EFFECTIVE_SPECULATION:
       case TERMINATED_ORPHANED:
-      case APPLICATION_ERROR:
-      case FRAMEWORK_ERROR:
       case INPUT_READ_ERROR:
       case OUTPUT_WRITE_ERROR:
       case OUTPUT_LOST:
@@ -288,7 +298,6 @@ public class TezUtilsInternal {
       case CONTAINER_LAUNCH_FAILED:
       case CONTAINER_EXITED:
       case CONTAINER_STOPPED:
-      case NODE_FAILED:
       case NODE_DISK_ERROR:
       default:
         return TaskAttemptEndReason.OTHER;

http://git-wip-us.apache.org/repos/asf/tez/blob/1cca4004/tez-common/src/main/java/org/apache/tez/dag/api/ContainerEndReason.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/api/ContainerEndReason.java b/tez-common/src/main/java/org/apache/tez/dag/api/ContainerEndReason.java
new file mode 100644
index 0000000..e13e886
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/dag/api/ContainerEndReason.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+// TODO TEZ-2003 Expose as a public API
+public enum ContainerEndReason {
+  NODE_FAILED, // Completed because the node running the container was marked as dead
+  INTERNAL_PREEMPTION, // Preempted by the AM, due to an internal decision
+  EXTERNAL_PREEMPTION, // Preempted due to cluster contention
+  APPLICATION_ERROR, // An error in the AM caused by user code
+  FRAMEWORK_ERROR, // An error in the AM - likely a bug.
+  LAUNCH_FAILED, // Failure to launch the container
+  COMPLETED, // Completed via normal flow
+  OTHER
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/1cca4004/tez-common/src/main/java/org/apache/tez/dag/api/TaskAttemptEndReason.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/api/TaskAttemptEndReason.java b/tez-common/src/main/java/org/apache/tez/dag/api/TaskAttemptEndReason.java
index 96a4768..de78d21 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/api/TaskAttemptEndReason.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/api/TaskAttemptEndReason.java
@@ -16,9 +16,12 @@ package org.apache.tez.dag.api;
 
 // TODO TEZ-2003 Expose as a public API
 public enum TaskAttemptEndReason {
-  COMMUNICATION_ERROR,
-  SERVICE_BUSY,
-  INTERRUPTED_BY_SYSTEM,
-  INTERRUPTED_BY_USER,
-  OTHER
+  NODE_FAILED, // Completed because the node running the container was marked as dead
+  COMMUNICATION_ERROR, // Communication error with the task
+  SERVICE_BUSY, // External service busy
+  INTERNAL_PREEMPTION, // Preempted by the AM, due to an internal decision
+  EXTERNAL_PREEMPTION, // Preempted due to cluster contention
+  APPLICATION_ERROR, // An error in the AM caused by user code
+  FRAMEWORK_ERROR, // An error in the AM - likely a bug.
+  OTHER // Unknown reason
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/1cca4004/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
index 2651013..d0a006b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
@@ -36,7 +36,10 @@ public abstract class TaskCommunicator extends AbstractService {
   public abstract void registerRunningContainer(ContainerId containerId, String hostname, int port);
 
   // TODO TEZ-2003 Ideally, don't expose YARN containerId; instead expose a Tez specific construct.
-  public abstract void registerContainerEnd(ContainerId containerId);
+  public abstract void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason);
+
+  // TODO TEZ-2003 Provide additional inforamtion like reason for container end / Task end.
+  // Was it caused by preemption - or as a result of a general task completion / container completion
 
   // TODO TEZ-2003 TaskSpec breakup into a clean interface
   // TODO TEZ-2003 Add support for priority
@@ -48,11 +51,7 @@ public abstract class TaskCommunicator extends AbstractService {
   // TODO TEZ-2003. Are additional APIs required to mark a container as completed ? - for completeness.
 
   // TODO TEZ-2003 Remove reference to TaskAttemptID
-  // TODO TEZ-2003 This needs some information about why the attempt is being unregistered.
-  // e.g. preempted in which case the task may need to be informed. Alternately as a result of
-  // a failed task.
-  // In case of preemption - a killTask API is likely a better bet than trying to overload this method.
-  public abstract void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID);
+  public abstract void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID, TaskAttemptEndReason endReason);
 
   // TODO TEZ-2003 This doesn't necessarily belong here. A server may not start within the AM.
   public abstract InetSocketAddress getAddress();

http://git-wip-us.apache.org/repos/asf/tez/blob/1cca4004/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
index e4dad27..92e38ae 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
@@ -21,6 +21,8 @@ package org.apache.tez.dag.app;
 import java.net.InetSocketAddress;
 
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.dag.api.ContainerEndReason;
+import org.apache.tez.dag.api.TaskAttemptEndReason;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.api.TaskCommunicator;
 import org.apache.tez.dag.app.rm.container.AMContainerTask;
@@ -34,9 +36,9 @@ public interface TaskAttemptListener {
 
   void registerTaskAttempt(AMContainerTask amContainerTask, ContainerId containerId, int taskCommId);
   
-  void unregisterRunningContainer(ContainerId containerId, int taskCommId);
+  void unregisterRunningContainer(ContainerId containerId, int taskCommId, ContainerEndReason endReason);
   
-  void unregisterTaskAttempt(TezTaskAttemptID attemptID, int taskCommId);
+  void unregisterTaskAttempt(TezTaskAttemptID attemptID, int taskCommId, TaskAttemptEndReason endReason);
 
   void dagComplete(DAG dag);
 

http://git-wip-us.apache.org/repos/asf/tez/blob/1cca4004/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index 1182d54..1d9bdc1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentMap;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.collections4.ListUtils;
+import org.apache.tez.dag.api.ContainerEndReason;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
@@ -356,7 +357,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
   }
 
   @Override
-  public void unregisterRunningContainer(ContainerId containerId, int taskCommId) {
+  public void unregisterRunningContainer(ContainerId containerId, int taskCommId, ContainerEndReason endReason) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Unregistering Container from TaskAttemptListener: " + containerId);
     }
@@ -364,7 +365,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     if (containerInfo.taskAttemptId != null) {
       registeredAttempts.remove(containerInfo.taskAttemptId);
     }
-    taskCommunicators[taskCommId].registerContainerEnd(containerId);
+    taskCommunicators[taskCommId].registerContainerEnd(containerId, endReason);
   }
 
   @Override
@@ -405,7 +406,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
   }
 
   @Override
-  public void unregisterTaskAttempt(TezTaskAttemptID attemptId, int taskCommId) {
+  public void unregisterTaskAttempt(TezTaskAttemptID attemptId, int taskCommId, TaskAttemptEndReason endReason) {
     ContainerId containerId = registeredAttempts.remove(attemptId);
     if (containerId == null) {
       LOG.warn("Unregister task attempt: " + attemptId + " from unknown container");
@@ -419,7 +420,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     }
     // Explicitly putting in a new entry so that synchronization is not required on the existing element in the map.
     registeredContainers.put(containerId, NULL_CONTAINER_INFO);
-    taskCommunicators[taskCommId].unregisterRunningTaskAttempt(attemptId);
+    taskCommunicators[taskCommId].unregisterRunningTaskAttempt(attemptId, endReason);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/1cca4004/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
index 34c8822..ee6ce6e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
@@ -41,6 +41,8 @@ import org.apache.tez.common.ContainerContext;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.common.security.TokenCache;
+import org.apache.tez.dag.api.ContainerEndReason;
+import org.apache.tez.dag.api.TaskAttemptEndReason;
 import org.apache.tez.dag.api.TaskCommunicator;
 import org.apache.tez.dag.api.TaskCommunicatorContext;
 import org.apache.tez.dag.api.TaskHeartbeatRequest;
@@ -183,7 +185,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
   }
 
   @Override
-  public void registerContainerEnd(ContainerId containerId) {
+  public void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason) {
     ContainerInfo containerInfo = registeredContainers.remove(containerId);
     if (containerInfo != null) {
       synchronized(containerInfo) {
@@ -229,7 +231,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
 
 
   @Override
-  public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID) {
+  public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID, TaskAttemptEndReason endReason) {
     TaskAttempt taskAttempt = new TaskAttempt(taskAttemptID);
     ContainerId containerId = attemptToContainerMap.remove(taskAttempt);
     if(containerId == null) {

http://git-wip-us.apache.org/repos/asf/tez/blob/1cca4004/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
index 9bb6d7f..8ef2a83 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
@@ -20,6 +20,7 @@ package org.apache.tez.dag.app.rm.container;
 
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.dag.api.ContainerEndReason;
 import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 
 public class AMContainerEventCompleted extends AMContainerEvent {
@@ -61,4 +62,44 @@ public class AMContainerEventCompleted extends AMContainerEvent {
     return errCause;
   }
 
+  public ContainerEndReason getContainerEndReason() {
+    if (errCause != null) {
+      switch (errCause) {
+        case INTERNAL_PREEMPTION:
+          return ContainerEndReason.INTERNAL_PREEMPTION;
+        case EXTERNAL_PREEMPTION:
+          return ContainerEndReason.EXTERNAL_PREEMPTION;
+        case FRAMEWORK_ERROR:
+          return ContainerEndReason.FRAMEWORK_ERROR;
+        case APPLICATION_ERROR:
+          return ContainerEndReason.APPLICATION_ERROR;
+        case CONTAINER_LAUNCH_FAILED:
+          return ContainerEndReason.LAUNCH_FAILED;
+        case NODE_FAILED:
+          return ContainerEndReason.NODE_FAILED;
+        case CONTAINER_EXITED:
+          return ContainerEndReason.COMPLETED;
+        case UNKNOWN_ERROR:
+        case TERMINATED_BY_CLIENT:
+        case TERMINATED_AT_SHUTDOWN:
+        case TERMINATED_INEFFECTIVE_SPECULATION:
+        case TERMINATED_EFFECTIVE_SPECULATION:
+        case TERMINATED_ORPHANED:
+        case INPUT_READ_ERROR:
+        case OUTPUT_WRITE_ERROR:
+        case OUTPUT_LOST:
+        case TASK_HEARTBEAT_ERROR:
+        case CONTAINER_STOPPED:
+        case NODE_DISK_ERROR:
+        case COMMUNICATION_ERROR:
+        case SERVICE_BUSY:
+        case INTERRUPTED_BY_SYSTEM:
+        case INTERRUPTED_BY_USER:
+        default:
+          return ContainerEndReason.OTHER;
+      }
+    } else {
+      return ContainerEndReason.OTHER;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/1cca4004/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 39df2e8..e9e0f04 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -27,6 +27,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
+import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.dag.api.ContainerEndReason;
+import org.apache.tez.dag.api.TaskAttemptEndReason;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.security.Credentials;
@@ -624,7 +627,7 @@ public class AMContainerImpl implements AMContainer {
         container.sendTerminatingToTaskAttempt(container.currentAttempt,
             event.getMessage(), TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED);
       }
-      container.unregisterFromTAListener();
+      container.unregisterFromTAListener(ContainerEndReason.LAUNCH_FAILED);
       container.deAllocate();
     }
   }
@@ -654,7 +657,7 @@ public class AMContainerImpl implements AMContainer {
       }
       container.containerLocalResources = null;
       container.additionalLocalResources = null;
-      container.unregisterFromTAListener();
+      container.unregisterFromTAListener(event.getContainerEndReason());
       String diag = event.getDiagnostics();
       if (!(diag == null || diag.equals(""))) {
         LOG.info("Container " + container.getContainerId()
@@ -680,7 +683,7 @@ public class AMContainerImpl implements AMContainer {
         container.sendTerminatingToTaskAttempt(container.currentAttempt,
             getMessage(container, cEvent), TaskAttemptTerminationCause.CONTAINER_STOPPED);
       }
-      container.unregisterFromTAListener();
+      container.unregisterFromTAListener(ContainerEndReason.OTHER);
       container.logStopped(container.currentAttempt == null ?
           ContainerExitStatus.SUCCESS 
           : ContainerExitStatus.INVALID);
@@ -732,7 +735,7 @@ public class AMContainerImpl implements AMContainer {
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
       super.transition(container, cEvent);
-      container.unregisterFromTAListener();
+      container.unregisterFromTAListener(ContainerEndReason.NODE_FAILED);
       container.deAllocate();
     }
   }
@@ -749,7 +752,7 @@ public class AMContainerImpl implements AMContainer {
                 container.getState(), TaskAttemptTerminationCause.FRAMEWORK_ERROR);
       }
       container.logStopped(ContainerExitStatus.ABORTED);
-      container.unregisterFromTAListener();
+      container.unregisterFromTAListener(ContainerEndReason.FRAMEWORK_ERROR);
       container.sendStopRequestToNM();
     }
   }
@@ -821,7 +824,7 @@ public class AMContainerImpl implements AMContainer {
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
 
       AMContainerEventAssignTA event = (AMContainerEventAssignTA) cEvent;
-      container.unregisterAttemptFromListener(container.currentAttempt);
+      container.unregisterAttemptFromListener(container.currentAttempt, TaskAttemptEndReason.FRAMEWORK_ERROR);
       container.handleExtraTAAssign(event, container.currentAttempt);
     }
   }
@@ -832,7 +835,7 @@ public class AMContainerImpl implements AMContainer {
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
       container.lastTaskFinishTime = System.currentTimeMillis();
       container.completedAttempts.add(container.currentAttempt);
-      container.unregisterAttemptFromListener(container.currentAttempt);
+      container.unregisterAttemptFromListener(container.currentAttempt, TaskAttemptEndReason.OTHER);
       container.currentAttempt = null;
     }
   }
@@ -849,7 +852,7 @@ public class AMContainerImpl implements AMContainer {
         container.sendTerminatedToTaskAttempt(container.currentAttempt,
             getMessage(container, event), event.getTerminationCause());
       }
-      container.unregisterAttemptFromListener(container.currentAttempt);
+      container.unregisterAttemptFromListener(container.currentAttempt, TezUtilsInternal.toTaskAttemptEndReason(event.getTerminationCause()));
       container.registerFailedAttempt(container.currentAttempt);
       container.currentAttempt= null;
       super.transition(container, cEvent);
@@ -859,7 +862,7 @@ public class AMContainerImpl implements AMContainer {
   protected static class StopRequestAtRunningTransition
       extends StopRequestAtIdleTransition {
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
-      container.unregisterAttemptFromListener(container.currentAttempt);
+      container.unregisterAttemptFromListener(container.currentAttempt, TaskAttemptEndReason.OTHER);
       super.transition(container, cEvent);
     }
   }
@@ -880,7 +883,7 @@ public class AMContainerImpl implements AMContainer {
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
       super.transition(container, cEvent);
-      container.unregisterAttemptFromListener(container.currentAttempt);
+      container.unregisterAttemptFromListener(container.currentAttempt, TaskAttemptEndReason.NODE_FAILED);
     }
   }
 
@@ -889,7 +892,7 @@ public class AMContainerImpl implements AMContainer {
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
       super.transition(container, cEvent);
-      container.unregisterAttemptFromListener(container.currentAttempt);
+      container.unregisterAttemptFromListener(container.currentAttempt, TaskAttemptEndReason.FRAMEWORK_ERROR);
       container.sendTerminatingToTaskAttempt(container.currentAttempt,
           "Container " + container.getContainerId() +
               " hit an invalid transition - " + cEvent.getType() + " at " +
@@ -1015,7 +1018,7 @@ public class AMContainerImpl implements AMContainer {
     LOG.warn(errorMessage);
     this.logStopped(ContainerExitStatus.INVALID);
     this.sendStopRequestToNM();
-    this.unregisterFromTAListener();
+    this.unregisterFromTAListener(ContainerEndReason.FRAMEWORK_ERROR);
     this.unregisterFromContainerListener();
   }
 
@@ -1073,8 +1076,8 @@ public class AMContainerImpl implements AMContainer {
         container.getNodeId(), container.getContainerToken(), launcherId));
   }
 
-  protected void unregisterAttemptFromListener(TezTaskAttemptID attemptId) {
-    taskAttemptListener.unregisterTaskAttempt(attemptId, taskCommId);
+  protected void unregisterAttemptFromListener(TezTaskAttemptID attemptId, TaskAttemptEndReason endReason) {
+    taskAttemptListener.unregisterTaskAttempt(attemptId, taskCommId, endReason);
   }
 
   protected void registerAttemptWithListener(AMContainerTask amContainerTask) {
@@ -1085,8 +1088,8 @@ public class AMContainerImpl implements AMContainer {
     taskAttemptListener.registerRunningContainer(containerId, taskCommId);
   }
 
-  protected void unregisterFromTAListener() {
-    this.taskAttemptListener.unregisterRunningContainer(containerId, taskCommId);
+  protected void unregisterFromTAListener(ContainerEndReason endReason) {
+    this.taskAttemptListener.unregisterRunningContainer(containerId, taskCommId, endReason);
   }
 
   protected void registerWithContainerListener() {

http://git-wip-us.apache.org/repos/asf/tez/blob/1cca4004/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
index 5924fc1..46a8f2c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
@@ -43,6 +43,8 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.common.ContainerContext;
 import org.apache.tez.common.ContainerTask;
 import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.dag.api.ContainerEndReason;
+import org.apache.tez.dag.api.TaskAttemptEndReason;
 import org.apache.tez.dag.api.TaskHeartbeatRequest;
 import org.apache.tez.dag.api.TaskHeartbeatResponse;
 import org.apache.tez.dag.api.TezException;
@@ -154,12 +156,12 @@ public class TestTaskAttemptListenerImplTezDag {
     assertEquals(taskSpec, containerTask.getTaskSpec());
 
     // Task unregistered. Should respond to heartbeats
-    taskAttemptListener.unregisterTaskAttempt(taskAttemptID, 0);
+    taskAttemptListener.unregisterTaskAttempt(taskAttemptID, 0, TaskAttemptEndReason.OTHER);
     containerTask = tezUmbilical.getTask(containerContext2);
     assertNull(containerTask);
 
     // Container unregistered. Should send a shouldDie = true
-    taskAttemptListener.unregisterRunningContainer(containerId2, 0);
+    taskAttemptListener.unregisterRunningContainer(containerId2, 0, ContainerEndReason.OTHER);
     containerTask = tezUmbilical.getTask(containerContext2);
     assertTrue(containerTask.shouldDie());
 
@@ -173,7 +175,7 @@ public class TestTaskAttemptListenerImplTezDag {
     doReturn(taskAttemptId2).when(taskSpec2).getTaskAttemptID();
     AMContainerTask amContainerTask2 = new AMContainerTask(taskSpec, null, null, false, 0);
     taskAttemptListener.registerTaskAttempt(amContainerTask2, containerId3, 0);
-    taskAttemptListener.unregisterRunningContainer(containerId3, 0);
+    taskAttemptListener.unregisterRunningContainer(containerId3, 0, ContainerEndReason.OTHER);
     containerTask = tezUmbilical.getTask(containerContext3);
     assertTrue(containerTask.shouldDie());
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/1cca4004/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
index bdd0f61..b8b4774 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -63,6 +63,8 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.TokenCache;
+import org.apache.tez.dag.api.ContainerEndReason;
+import org.apache.tez.dag.api.TaskAttemptEndReason;
 import org.apache.tez.dag.api.TaskCommunicator;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerHeartbeatHandler;
@@ -132,14 +134,14 @@ public class TestAMContainer {
     wc.verifyState(AMContainerState.IDLE);
     wc.verifyNoOutgoingEvents();
     assertNull(wc.amContainer.getCurrentTaskAttempt());
-    verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID, 0);
+    verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID, 0, TaskAttemptEndReason.OTHER);
 
     // Container completed
     wc.containerCompleted();
     wc.verifyHistoryStopEvent();
     wc.verifyState(AMContainerState.COMPLETED);
     wc.verifyNoOutgoingEvents();
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.COMPLETED);
     verify(wc.chh).unregister(wc.containerID);
 
     assertEquals(1, wc.amContainer.getAllTaskAttempts().size());
@@ -181,13 +183,13 @@ public class TestAMContainer {
     wc.verifyState(AMContainerState.IDLE);
     wc.verifyNoOutgoingEvents();
     assertNull(wc.amContainer.getCurrentTaskAttempt());
-    verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID, 0);
+    verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID, 0, TaskAttemptEndReason.OTHER);
 
     wc.containerCompleted();
     wc.verifyHistoryStopEvent();
     wc.verifyState(AMContainerState.COMPLETED);
     wc.verifyNoOutgoingEvents();
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.COMPLETED);
     verify(wc.chh).unregister(wc.containerID);
 
     assertEquals(1, wc.amContainer.getAllTaskAttempts().size());
@@ -232,7 +234,7 @@ public class TestAMContainer {
     wc.verifyState(AMContainerState.IDLE);
     wc.verifyNoOutgoingEvents();
     assertNull(wc.amContainer.getCurrentTaskAttempt());
-    verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID, 0);
+    verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID, 0, TaskAttemptEndReason.OTHER);
 
     TezTaskAttemptID taId2 = TezTaskAttemptID.getInstance(wc.taskID, 2);
     wc.assignTaskAttempt(taId2);
@@ -247,14 +249,14 @@ public class TestAMContainer {
     wc.verifyState(AMContainerState.IDLE);
     wc.verifyNoOutgoingEvents();
     assertNull(wc.amContainer.getCurrentTaskAttempt());
-    verify(wc.tal).unregisterTaskAttempt(taId2, 0);
+    verify(wc.tal).unregisterTaskAttempt(taId2, 0, TaskAttemptEndReason.OTHER);
 
     // Container completed
     wc.containerCompleted();
     wc.verifyHistoryStopEvent();
     wc.verifyState(AMContainerState.COMPLETED);
     wc.verifyNoOutgoingEvents();
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.COMPLETED);
     verify(wc.chh).unregister(wc.containerID);
 
     assertEquals(2, wc.amContainer.getAllTaskAttempts().size());
@@ -287,7 +289,7 @@ public class TestAMContainer {
     wc.verifyHistoryStopEvent();
     wc.verifyState(AMContainerState.COMPLETED);
     wc.verifyNoOutgoingEvents();
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.OTHER);
     verify(wc.chh).unregister(wc.containerID);
 
     assertNull(wc.amContainer.getCurrentTaskAttempt());
@@ -324,7 +326,7 @@ public class TestAMContainer {
     wc.verifyHistoryStopEvent();
     wc.verifyState(AMContainerState.COMPLETED);
     wc.verifyNoOutgoingEvents();
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.OTHER);
     verify(wc.chh).unregister(wc.containerID);
 
     assertNull(wc.amContainer.getCurrentTaskAttempt());
@@ -347,7 +349,7 @@ public class TestAMContainer {
     wc.assignTaskAttempt(taID2);
 
     wc.verifyState(AMContainerState.STOP_REQUESTED);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.FRAMEWORK_ERROR);
     verify(wc.chh).unregister(wc.containerID);
     // 1 for NM stop request. 2 TERMINATING to TaskAttempt.
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3);
@@ -385,7 +387,7 @@ public class TestAMContainer {
     wc.assignTaskAttempt(taID2);
 
     wc.verifyState(AMContainerState.STOP_REQUESTED);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.FRAMEWORK_ERROR);
     verify(wc.chh).unregister(wc.containerID);
     // 1 for NM stop request. 2 TERMINATING to TaskAttempt.
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3);
@@ -421,7 +423,7 @@ public class TestAMContainer {
 
     wc.containerTimedOut();
     wc.verifyState(AMContainerState.STOP_REQUESTED);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.OTHER);
     verify(wc.chh).unregister(wc.containerID);
     // 1 to TA, 1 for RM de-allocate.
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
@@ -455,7 +457,7 @@ public class TestAMContainer {
 
     wc.stopRequest();
     wc.verifyState(AMContainerState.STOP_REQUESTED);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.OTHER);
     verify(wc.chh).unregister(wc.containerID);
     // 1 to TA, 1 for RM de-allocate.
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
@@ -489,7 +491,7 @@ public class TestAMContainer {
     wc.launchFailed();
     wc.verifyState(AMContainerState.STOPPING);
     verify(wc.tal).registerRunningContainer(wc.containerID, 0);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.LAUNCH_FAILED);
 
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -539,7 +541,7 @@ public class TestAMContainer {
     wc.containerCompleted();
     wc.verifyState(AMContainerState.COMPLETED);
     verify(wc.tal).registerRunningContainer(wc.containerID, 0);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.COMPLETED);
 
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -569,7 +571,7 @@ public class TestAMContainer {
     wc.containerCompleted(ContainerExitStatus.DISKS_FAILED, TaskAttemptTerminationCause.NODE_DISK_ERROR);
     wc.verifyState(AMContainerState.COMPLETED);
     verify(wc.tal).registerRunningContainer(wc.containerID, 0);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.OTHER);
 
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -600,7 +602,7 @@ public class TestAMContainer {
     wc.containerCompleted(ContainerExitStatus.ABORTED, TaskAttemptTerminationCause.NODE_FAILED);
     wc.verifyState(AMContainerState.COMPLETED);
     verify(wc.tal).registerRunningContainer(wc.containerID, 0);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.NODE_FAILED);
 
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -631,7 +633,7 @@ public class TestAMContainer {
     wc.containerCompleted();
     wc.verifyState(AMContainerState.COMPLETED);
     verify(wc.tal).registerRunningContainer(wc.containerID, 0);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.COMPLETED);
     verify(wc.chh).register(wc.containerID);
     verify(wc.chh).unregister(wc.containerID);
 
@@ -660,7 +662,7 @@ public class TestAMContainer {
     wc.containerCompleted();
     wc.verifyState(AMContainerState.COMPLETED);
     verify(wc.tal).registerRunningContainer(wc.containerID, 0);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.COMPLETED);
     verify(wc.chh).register(wc.containerID);
     verify(wc.chh).unregister(wc.containerID);
 
@@ -695,7 +697,7 @@ public class TestAMContainer {
     wc.containerCompleted(ContainerExitStatus.PREEMPTED, TaskAttemptTerminationCause.EXTERNAL_PREEMPTION);
     wc.verifyState(AMContainerState.COMPLETED);
     verify(wc.tal).registerRunningContainer(wc.containerID, 0);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.EXTERNAL_PREEMPTION);
     verify(wc.chh).register(wc.containerID);
     verify(wc.chh).unregister(wc.containerID);
 
@@ -732,7 +734,8 @@ public class TestAMContainer {
     wc.containerCompleted(ContainerExitStatus.INVALID, TaskAttemptTerminationCause.INTERNAL_PREEMPTION);
     wc.verifyState(AMContainerState.COMPLETED);
     verify(wc.tal).registerRunningContainer(wc.containerID, 0);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0,
+        ContainerEndReason.INTERNAL_PREEMPTION);
     verify(wc.chh).register(wc.containerID);
     verify(wc.chh).unregister(wc.containerID);
 
@@ -769,7 +772,7 @@ public class TestAMContainer {
     wc.containerCompleted(ContainerExitStatus.DISKS_FAILED, TaskAttemptTerminationCause.NODE_DISK_ERROR);
     wc.verifyState(AMContainerState.COMPLETED);
     verify(wc.tal).registerRunningContainer(wc.containerID, 0);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.OTHER);
     verify(wc.chh).register(wc.containerID);
     verify(wc.chh).unregister(wc.containerID);
 

http://git-wip-us.apache.org/repos/asf/tez/blob/1cca4004/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
index cf28b11..98673a6 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.dag.api.ContainerEndReason;
 import org.apache.tez.dag.api.TaskAttemptEndReason;
 import org.apache.tez.dag.api.TaskCommunicatorContext;
 import org.apache.tez.dag.app.TezTaskCommunicatorImpl;
@@ -98,8 +99,8 @@ public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl
   }
 
   @Override
-  public void registerContainerEnd(ContainerId containerId) {
-    super.registerContainerEnd(containerId);
+  public void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason) {
+    super.registerContainerEnd(containerId, endReason);
   }
 
   @Override
@@ -175,8 +176,8 @@ public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl
   }
 
   @Override
-  public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID) {
-    super.unregisterRunningTaskAttempt(taskAttemptID);
+  public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID, TaskAttemptEndReason endReason) {
+    super.unregisterRunningTaskAttempt(taskAttemptID, endReason);
     // Nothing else to do for now. The push API in the test does not support termination of a running task
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/1cca4004/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
index 3bf9f84..15629fd 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
@@ -261,7 +261,13 @@ public class TezTaskRunner2 {
             taskRunnerCallable.interruptTask();
           }
           return true;
+        } else {
+          LOG.info("Ignoring killTask request for {} since end reason is already set to {}",
+              task.getTaskAttemptID(), firstEndReason);
         }
+      } else {
+        LOG.info("Ignoring killTask request for {} since it is not in a running state",
+            task.getTaskAttemptID());
       }
     }
     return false;
@@ -389,10 +395,18 @@ public class TezTaskRunner2 {
         isFirstTerminate = trySettingEndReason(EndReason.CONTAINER_STOP_REQUESTED);
         // Respect stopContainerRequested since it can come in at any point, despite a previous failure.
         stopContainerRequested.set(true);
-      }
 
-      if (isFirstTerminate) {
-        killTask();
+        if (isFirstTerminate) {
+          LOG.info("Attempting to abort {} since a shutdown request was received",
+              task.getTaskAttemptID());
+          if (taskRunnerCallable != null) {
+            taskKillStartTime = System.currentTimeMillis();
+            taskRunnerCallable.interruptTask();
+          }
+        } else {
+          LOG.info("Not acting on shutdown request for {} since the task is not in running state",
+              task.getTaskAttemptID());
+        }
       }
     }
   }