You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/09 03:43:07 UTC

[29/43] tez git commit: TEZ-2187. Allow TaskCommunicators to report failed / killed attempts. (sseth)

TEZ-2187. Allow TaskCommunicators to report failed / killed attempts.  (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e1ab1914
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e1ab1914
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e1ab1914

Branch: refs/heads/TEZ-2003
Commit: e1ab191494658b5470d12d4b25b016530b28d398
Parents: 99a1b85
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue Mar 10 01:25:39 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri May 8 14:41:30 2015 -0700

----------------------------------------------------------------------
 TEZ-2003-CHANGES.txt                            |   1 +
 .../org/apache/tez/common/TezUtilsInternal.java |  60 +++++++++
 .../tez/dag/api/TaskAttemptEndReason.java       |  24 ++++
 .../records/TaskAttemptTerminationCause.java    |   7 +-
 .../apache/tez/dag/api/TaskCommunicator.java    |   2 +
 .../tez/dag/api/TaskCommunicatorContext.java    |  13 +-
 .../dag/app/TaskAttemptListenerImpTezDag.java   |  33 +++++
 .../event/TaskAttemptEventAttemptFailed.java    |   2 +
 .../event/TaskAttemptEventAttemptKilled.java    |  47 +++++++
 .../dag/app/dag/event/TaskAttemptEventType.java |   5 +-
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |  33 ++++-
 .../tez/dag/app/rm/AMSchedulerEventTAEnded.java |   9 +-
 .../dag/app/rm/LocalTaskSchedulerService.java   |   3 +-
 .../dag/app/rm/TaskSchedulerEventHandler.java   |   7 +-
 .../tez/dag/app/rm/TaskSchedulerService.java    |   6 +-
 .../dag/app/rm/YarnTaskSchedulerService.java    |   8 +-
 .../app/TestTaskAttemptListenerImplTezDag.java  |   1 +
 .../app/TestTaskAttemptListenerImplTezDag2.java | 126 +++++++++++++++++++
 .../tez/dag/app/rm/TestContainerReuse.java      |  65 +++++-----
 .../app/rm/TestLocalTaskSchedulerService.java   |   5 +-
 .../tez/dag/app/rm/TestTaskScheduler.java       |  18 +--
 .../rm/TezTestServiceTaskSchedulerService.java  |   3 +-
 .../TezTestServiceTaskCommunicatorImpl.java     |  36 +++++-
 .../org/apache/tez/service/ContainerRunner.java |   5 +-
 .../tez/service/MiniTezTestServiceCluster.java  |   5 +-
 .../tez/service/impl/ContainerRunnerImpl.java   |  60 +++++++--
 .../apache/tez/service/impl/TezTestService.java |   6 +-
 .../impl/TezTestServiceProtocolServerImpl.java  |  10 +-
 .../tez/tests/TestExternalTezServices.java      |  29 +++++
 29 files changed, 548 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index 7726815..774a685 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -10,5 +10,6 @@ ALL CHANGES:
   TEZ-2138. Fix minor bugs in adding default scheduler, getting launchers.
   TEZ-2139. Update tez version to 0.7.0-TEZ-2003-SNAPSHOT.
   TEZ-2175. Task priority should be available to the TaskCommunicator plugin.
+  TEZ-2187. Allow TaskCommunicators to report failed / killed attempts.
 
 INCOMPATIBLE CHANGES:

http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
index 9c78377..347a4f6 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.log4j.Appender;
 import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.TaskAttemptEndReason;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.records.DAGProtos;
 import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
@@ -49,6 +50,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Stopwatch;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 
 @Private
 public class TezUtilsInternal {
@@ -234,4 +236,62 @@ public class TezUtilsInternal {
     return sb.toString();
   }
 
+  public static TaskAttemptTerminationCause fromTaskAttemptEndReason(
+      TaskAttemptEndReason taskAttemptEndReason) {
+    if (taskAttemptEndReason == null) {
+      return null;
+    }
+    switch (taskAttemptEndReason) {
+      case COMMUNICATION_ERROR:
+        return TaskAttemptTerminationCause.COMMUNICATION_ERROR;
+      case SERVICE_BUSY:
+        return TaskAttemptTerminationCause.SERVICE_BUSY;
+      case INTERRUPTED_BY_SYSTEM:
+        return TaskAttemptTerminationCause.INTERRUPTED_BY_SYSTEM;
+      case INTERRUPTED_BY_USER:
+        return TaskAttemptTerminationCause.INTERRUPTED_BY_USER;
+      case OTHER:
+        return TaskAttemptTerminationCause.UNKNOWN_ERROR;
+      default:
+        return TaskAttemptTerminationCause.UNKNOWN_ERROR;
+    }
+  }
+
+  public static TaskAttemptEndReason toTaskAttemptEndReason(TaskAttemptTerminationCause cause) {
+    // TODO Post TEZ-2003. Consolidate these states, and mappings.
+    if (cause == null) {
+      return null;
+    }
+    switch (cause) {
+      case COMMUNICATION_ERROR:
+        return TaskAttemptEndReason.COMMUNICATION_ERROR;
+      case SERVICE_BUSY:
+        return TaskAttemptEndReason.SERVICE_BUSY;
+      case INTERRUPTED_BY_SYSTEM:
+        return TaskAttemptEndReason.INTERRUPTED_BY_SYSTEM;
+      case INTERRUPTED_BY_USER:
+        return TaskAttemptEndReason.INTERRUPTED_BY_USER;
+      case UNKNOWN_ERROR:
+      case TERMINATED_BY_CLIENT:
+      case TERMINATED_AT_SHUTDOWN:
+      case INTERNAL_PREEMPTION:
+      case EXTERNAL_PREEMPTION:
+      case TERMINATED_INEFFECTIVE_SPECULATION:
+      case TERMINATED_EFFECTIVE_SPECULATION:
+      case TERMINATED_ORPHANED:
+      case APPLICATION_ERROR:
+      case FRAMEWORK_ERROR:
+      case INPUT_READ_ERROR:
+      case OUTPUT_WRITE_ERROR:
+      case OUTPUT_LOST:
+      case TASK_HEARTBEAT_ERROR:
+      case CONTAINER_LAUNCH_FAILED:
+      case CONTAINER_EXITED:
+      case CONTAINER_STOPPED:
+      case NODE_FAILED:
+      case NODE_DISK_ERROR:
+      default:
+        return TaskAttemptEndReason.OTHER;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-common/src/main/java/org/apache/tez/dag/api/TaskAttemptEndReason.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/api/TaskAttemptEndReason.java b/tez-common/src/main/java/org/apache/tez/dag/api/TaskAttemptEndReason.java
new file mode 100644
index 0000000..96a4768
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/dag/api/TaskAttemptEndReason.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+// TODO TEZ-2003 Expose as a public API
+public enum TaskAttemptEndReason {
+  COMMUNICATION_ERROR,
+  SERVICE_BUSY,
+  INTERRUPTED_BY_SYSTEM,
+  INTERRUPTED_BY_USER,
+  OTHER
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java b/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java
index ef0bb33..7112d9e 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java
@@ -41,5 +41,10 @@ public enum TaskAttemptTerminationCause {
   CONTAINER_STOPPED, // Container stopped or released by Tez
   NODE_FAILED, // Node for the container failed
   NODE_DISK_ERROR, // Disk failed on the node runnign the task
-  
+
+  COMMUNICATION_ERROR, // Equivalent to a launch failure
+  SERVICE_BUSY, // Service rejected the task
+  INTERRUPTED_BY_SYSTEM, // Interrupted by the system. e.g. Pre-emption
+  INTERRUPTED_BY_USER, // Interrupted by the user
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
index 82eed20..945091e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
@@ -45,6 +45,8 @@ public abstract class TaskCommunicator extends AbstractService {
                                                   Credentials credentials,
                                                   boolean credentialsChanged, int priority);
 
+  // TODO TEZ-2003. Are additional APIs required to mark a container as completed ? - for completeness.
+
   // TODO TEZ-2003 Remove reference to TaskAttemptID
   public abstract void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID);
 

http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
index 41675fe..a85fb7f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
@@ -14,6 +14,7 @@
 
 package org.apache.tez.dag.api;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 
 import org.apache.hadoop.security.Credentials;
@@ -37,15 +38,21 @@ public interface TaskCommunicatorContext {
   // TODO TEZ-2003 Move to vertex, taskIndex, version
   boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException;
 
+  // TODO TEZ-2003 Split the heartbeat API to a liveness check and a status update
   TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request) throws IOException, TezException;
 
   boolean isKnownContainer(ContainerId containerId);
 
-  // TODO TEZ-2003 Move to vertex, taskIndex, version
+  // TODO TEZ-2003 Move to vertex, taskIndex, version. Rename to taskAttempt*
   void taskStartedRemotely(TezTaskAttemptID taskAttemptID, ContainerId containerId);
 
-  // TODO TEZ-2003 Add an API to register task failure - for example, a communication failure.
-  // This will have to take into consideration the TA_FAILED event
+  // TODO TEZ-2003 Move to vertex, taskIndex, version. Rename to taskAttempt*
+  void taskKilled(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason, @Nullable String diagnostics);
+
+  // TODO TEZ-2003 Move to vertex, taskIndex, version. Rename to taskAttempt*
+  void taskFailed(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason, @Nullable String diagnostics);
+
+  // TODO TEZ-2003 API. Should a method exist for task succeeded.
 
   // TODO Eventually Add methods to report availability stats to the scheduler.
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index b570301..94f6cae 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -37,14 +37,17 @@ import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
 import org.apache.tez.runtime.api.impl.EventType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.dag.api.TaskCommunicator;
 import org.apache.tez.dag.api.TaskCommunicatorContext;
+import org.apache.tez.dag.api.TaskAttemptEndReason;
 import org.apache.tez.dag.api.TaskHeartbeatResponse;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezException;
@@ -54,7 +57,10 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.dag.api.TaskHeartbeatRequest;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.Task;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptKilled;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
 import org.apache.tez.dag.app.rm.container.AMContainerTask;
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -257,6 +263,33 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     pingContainerHeartbeatHandler(containerId);
   }
 
+  @Override
+  public void taskKilled(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason,
+                         String diagnostics) {
+    // Regular flow via TaskAttempt will take care of un-registering from the heartbeat handler,
+    // and messages from the scheduler will release the container.
+    // TODO TEZ-2003 Maybe consider un-registering here itself, since the task is not active anymore,
+    // instead of waiting for the unregister to flow through the Container.
+    // Fix along the same lines as TEZ-2124 by introducing an explict context.
+    context.getEventHandler().handle(new TaskAttemptEventAttemptKilled(taskAttemptId,
+        diagnostics, TezUtilsInternal.fromTaskAttemptEndReason(
+        taskAttemptEndReason)));
+  }
+
+  @Override
+  public void taskFailed(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason,
+                         String diagnostics) {
+    // Regular flow via TaskAttempt will take care of un-registering from the heartbeat handler,
+    // and messages from the scheduler will release the container.
+    // TODO TEZ-2003 Maybe consider un-registering here itself, since the task is not active anymore,
+    // instead of waiting for the unregister to flow through the Container.
+    // Fix along the same lines as TEZ-2124 by introducing an explict context.
+    context.getEventHandler().handle(new TaskAttemptEventAttemptFailed(taskAttemptId,
+        TaskAttemptEventType.TA_FAILED, diagnostics, TezUtilsInternal.fromTaskAttemptEndReason(
+        taskAttemptEndReason)));
+  }
+
+
   /**
    * Child checking whether it can commit.
    * <p/>

http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java
index b9c1d09..7ec8921 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java
@@ -26,6 +26,8 @@ public class TaskAttemptEventAttemptFailed extends TaskAttemptEvent
 
   private final String diagnostics;
   private final TaskAttemptTerminationCause errorCause;
+
+  /* Accepted Types - FAILED, TIMED_OUT */
   public TaskAttemptEventAttemptFailed(TezTaskAttemptID id,
       TaskAttemptEventType type, String diagnostics, TaskAttemptTerminationCause errorCause) {
     super(id, type);

http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptKilled.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptKilled.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptKilled.java
new file mode 100644
index 0000000..72e6b07
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptKilled.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
+public class TaskAttemptEventAttemptKilled extends TaskAttemptEvent
+    implements DiagnosableEvent, TaskAttemptEventTerminationCauseEvent {
+
+  private final String diagnostics;
+  private final TaskAttemptTerminationCause errorCause;
+  public TaskAttemptEventAttemptKilled(TezTaskAttemptID id,
+                                       String diagnostics,
+                                       TaskAttemptTerminationCause errorCause) {
+    super(id, TaskAttemptEventType.TA_KILLED);
+    this.diagnostics = diagnostics;
+    this.errorCause = errorCause;
+  }
+
+  @Override
+  public String getDiagnosticInfo() {
+    return diagnostics;
+  }
+
+  @Override
+  public TaskAttemptTerminationCause getTerminationCause() {
+    return errorCause;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
index b7aca36..6d20368 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
@@ -26,14 +26,15 @@ public enum TaskAttemptEventType {
 //Producer:Task, Speculator
   TA_SCHEDULE,
 
-//Producer: TaskAttemptListener
+//Producer: TaskAttemptListener | Vertex after routing events
   TA_STARTED_REMOTELY,
   TA_STATUS_UPDATE,
   TA_DIAGNOSTICS_UPDATE, // REMOVE THIS - UNUSED
   TA_DONE,
   TA_FAILED,
+  TA_KILLED, // Generated by TaskCommunicators
   TA_TIMED_OUT,
-  
+
 //Producer: Client, Scheduler, On speculation.
   TA_KILL_REQUEST,
 

http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index c80571d..11d4df9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.RackResolver;
 import org.apache.hadoop.yarn.util.Records;
+import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.common.counters.DAGCounter;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.ProcessorDescriptor;
@@ -185,6 +186,11 @@ public class TaskAttemptImpl implements TaskAttempt,
 
   private final StateMachine<TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent> stateMachine;
 
+  // TODO TEZ-2003 We may need some additional state management for STATUS_UPDATES, FAILED, KILLED coming in before
+  // TASK_STARTED_REMOTELY. In case of a PUSH it's more intuitive to send TASK_STARTED_REMOTELY after communicating
+  // with the listening service and getting a response, which in turn can trigger STATUS_UPDATES / FAILED / KILLED
+
+  // TA_KILLED handled the same as TA_KILL_REQUEST. Just a different name indicating a request / already killed.
   private static StateMachineFactory
   <TaskAttemptImpl, TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent>
   stateMachineFactory
@@ -225,6 +231,10 @@ public class TaskAttemptImpl implements TaskAttempt,
           new TerminatedBeforeRunningTransition(KILLED_HELPER))
       .addTransition(TaskAttemptStateInternal.START_WAIT,
           TaskAttemptStateInternal.KILL_IN_PROGRESS,
+          TaskAttemptEventType.TA_KILLED,
+          new TerminatedBeforeRunningTransition(KILLED_HELPER))
+      .addTransition(TaskAttemptStateInternal.START_WAIT,
+          TaskAttemptStateInternal.KILL_IN_PROGRESS,
           TaskAttemptEventType.TA_NODE_FAILED,
           new NodeFailedBeforeRunningTransition())
       .addTransition(TaskAttemptStateInternal.START_WAIT,
@@ -265,6 +275,10 @@ public class TaskAttemptImpl implements TaskAttempt,
           new TerminatedWhileRunningTransition(KILLED_HELPER))
       .addTransition(TaskAttemptStateInternal.RUNNING,
           TaskAttemptStateInternal.KILL_IN_PROGRESS,
+          TaskAttemptEventType.TA_KILLED,
+          new TerminatedWhileRunningTransition(KILLED_HELPER))
+      .addTransition(TaskAttemptStateInternal.RUNNING,
+          TaskAttemptStateInternal.KILL_IN_PROGRESS,
           TaskAttemptEventType.TA_NODE_FAILED,
           new TerminatedWhileRunningTransition(KILLED_HELPER))
       .addTransition(TaskAttemptStateInternal.RUNNING,
@@ -303,6 +317,7 @@ public class TaskAttemptImpl implements TaskAttempt,
               TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED,
               TaskAttemptEventType.TA_TIMED_OUT,
               TaskAttemptEventType.TA_KILL_REQUEST,
+              TaskAttemptEventType.TA_KILLED,
               TaskAttemptEventType.TA_NODE_FAILED,
               TaskAttemptEventType.TA_CONTAINER_TERMINATING,
               TaskAttemptEventType.TA_OUTPUT_FAILED))
@@ -324,6 +339,7 @@ public class TaskAttemptImpl implements TaskAttempt,
               TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED,
               TaskAttemptEventType.TA_TIMED_OUT,
               TaskAttemptEventType.TA_KILL_REQUEST,
+              TaskAttemptEventType.TA_KILLED,
               TaskAttemptEventType.TA_NODE_FAILED,
               TaskAttemptEventType.TA_CONTAINER_TERMINATING,
               TaskAttemptEventType.TA_OUTPUT_FAILED))
@@ -342,6 +358,7 @@ public class TaskAttemptImpl implements TaskAttempt,
               TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED,
               TaskAttemptEventType.TA_TIMED_OUT,
               TaskAttemptEventType.TA_KILL_REQUEST,
+              TaskAttemptEventType.TA_KILLED,
               TaskAttemptEventType.TA_NODE_FAILED,
               TaskAttemptEventType.TA_CONTAINER_TERMINATING,
               TaskAttemptEventType.TA_CONTAINER_TERMINATED,
@@ -361,6 +378,7 @@ public class TaskAttemptImpl implements TaskAttempt,
               TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED,
               TaskAttemptEventType.TA_TIMED_OUT,
               TaskAttemptEventType.TA_KILL_REQUEST,
+              TaskAttemptEventType.TA_KILLED,
               TaskAttemptEventType.TA_NODE_FAILED,
               TaskAttemptEventType.TA_CONTAINER_TERMINATING,
               TaskAttemptEventType.TA_CONTAINER_TERMINATED,
@@ -383,6 +401,12 @@ public class TaskAttemptImpl implements TaskAttempt,
           TaskAttemptStateInternal.SUCCEEDED,
           EnumSet.of(TaskAttemptStateInternal.KILLED,
               TaskAttemptStateInternal.SUCCEEDED),
+          TaskAttemptEventType.TA_KILLED,
+          new TerminatedAfterSuccessTransition())
+      .addTransition(
+          TaskAttemptStateInternal.SUCCEEDED,
+          EnumSet.of(TaskAttemptStateInternal.KILLED,
+              TaskAttemptStateInternal.SUCCEEDED),
           TaskAttemptEventType.TA_NODE_FAILED,
           new TerminatedAfterSuccessTransition())
       .addTransition(
@@ -434,7 +458,6 @@ public class TaskAttemptImpl implements TaskAttempt,
     this.leafVertex = leafVertex;
   }
 
-
   @Override
   public TezTaskAttemptID getID() {
     return attemptId;
@@ -1030,6 +1053,7 @@ public class TaskAttemptImpl implements TaskAttempt,
 
       // Compute node/rack location request even if re-scheduled.
       Set<String> racks = new HashSet<String>();
+      // TODO Post TEZ-2003. Allow for a policy in the VMPlugin to define localicty for different attempts.
       TaskLocationHint locationHint = ta.getTaskLocationHint();
       if (locationHint != null) {
         if (locationHint.getRacks() != null) {
@@ -1104,6 +1128,8 @@ public class TaskAttemptImpl implements TaskAttempt,
 
     @Override
     public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
+      // This transition should not be invoked directly, if a scheduler event has already been sent out.
+      // Sub-classes should be used if a scheduler request has been sent.
       ta.setFinishTime();
 
       if (event instanceof DiagnosableEvent) {
@@ -1218,7 +1244,8 @@ public class TaskAttemptImpl implements TaskAttempt,
       // Inform the scheduler
       if (sendSchedulerEvent()) {
         ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.containerId, helper
-            .getTaskAttemptState(), ta.getVertex().getTaskSchedulerIdentifier()));
+            .getTaskAttemptState(), TezUtilsInternal.toTaskAttemptEndReason(ta.terminationCause),
+            ta.getVertex().getTaskSchedulerIdentifier()));
       }
     }
   }
@@ -1300,7 +1327,7 @@ public class TaskAttemptImpl implements TaskAttempt,
 
       // Inform the Scheduler.
       ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.containerId,
-          TaskAttemptState.SUCCEEDED, ta.getVertex().getTaskSchedulerIdentifier()));
+          TaskAttemptState.SUCCEEDED, null, ta.getVertex().getTaskSchedulerIdentifier()));
 
       // Inform the task.
       ta.sendEvent(new TaskEventTAUpdate(ta.attemptId,

http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java
index 2ace642..a775948 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java
@@ -18,6 +18,7 @@
 package org.apache.tez.dag.app.rm;
 
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.dag.api.TaskAttemptEndReason;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.app.dag.TaskAttempt;
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -27,14 +28,16 @@ public class AMSchedulerEventTAEnded extends AMSchedulerEvent {
   private final TaskAttempt attempt;
   private final ContainerId containerId;
   private final TaskAttemptState state;
+  private final TaskAttemptEndReason taskAttemptEndReason;
   private final int schedulerId;
 
   public AMSchedulerEventTAEnded(TaskAttempt attempt, ContainerId containerId,
-      TaskAttemptState state, int schedulerId) {
+      TaskAttemptState state, TaskAttemptEndReason taskAttemptEndReason, int schedulerId) {
     super(AMSchedulerEventType.S_TA_ENDED);
     this.attempt = attempt;
     this.containerId = containerId;
     this.state = state;
+    this.taskAttemptEndReason = taskAttemptEndReason;
     this.schedulerId = schedulerId;
   }
 
@@ -57,4 +60,8 @@ public class AMSchedulerEventTAEnded extends AMSchedulerEvent {
   public int getSchedulerId() {
     return schedulerId;
   }
+
+  public TaskAttemptEndReason getTaskAttemptEndReason() {
+    return taskAttemptEndReason;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
index 72a074f..a234e07 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.dag.api.TaskAttemptEndReason;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.app.AppContext;
@@ -149,7 +150,7 @@ public class LocalTaskSchedulerService extends TaskSchedulerService {
   }
   
   @Override
-  public boolean deallocateTask(Object task, boolean taskSucceeded) {
+  public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEndReason endReason) {
     return taskRequestHandler.addDeallocateTaskRequest(task);
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 8e5fc71..9f09f68 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -289,7 +289,9 @@ public class TaskSchedulerEventHandler extends AbstractService
 
   private void handleTAUnsuccessfulEnd(AMSchedulerEventTAEnded event) {
     TaskAttempt attempt = event.getAttempt();
-    boolean wasContainerAllocated = taskSchedulers[event.getSchedulerId()].deallocateTask(attempt, false);
+    // Propagate state and failure cause (if any) when informing the scheduler about the de-allocation.
+    boolean wasContainerAllocated = taskSchedulers[event.getSchedulerId()]
+        .deallocateTask(attempt, false, event.getTaskAttemptEndReason());
     // use stored value of container id in case the scheduler has removed this
     // assignment because the task has been deallocated earlier.
     // retroactive case
@@ -311,6 +313,7 @@ public class TaskSchedulerEventHandler extends AbstractService
       sendEvent(new AMContainerEventStopRequest(attemptContainerId));
       // Inform the Node - the task has asked to be STOPPED / has already
       // stopped.
+      // AMNodeImpl blacklisting logic does not account for KILLED attempts.
       sendEvent(new AMNodeEventTaskAttemptEnded(appContext.getAllContainers().
           get(attemptContainerId).getContainer().getNodeId(), attemptContainerId,
           attempt.getID(), event.getState() == TaskAttemptState.FAILED));
@@ -332,7 +335,7 @@ public class TaskSchedulerEventHandler extends AbstractService
     }
 
     boolean wasContainerAllocated = taskSchedulers[event.getSchedulerId()].deallocateTask(attempt,
-        true);
+        true, null);
     if (!wasContainerAllocated) {
       LOG.error("De-allocated successful task: " + attempt.getID()
           + ", but TaskScheduler reported no container assigned to task");

http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java
index 48d5455..07dfcd6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.dag.api.TaskAttemptEndReason;
 
 public abstract class TaskSchedulerService extends AbstractService{
 
@@ -61,8 +62,9 @@ public abstract class TaskSchedulerService extends AbstractService{
   public abstract void allocateTask(Object task, Resource capability,
       ContainerId containerId, Priority priority, Object containerSignature,
       Object clientCookie);
-  
-  public abstract boolean deallocateTask(Object task, boolean taskSucceeded);
+
+  /** Plugin writers must ensure to de-allocate a container once it's done, so that it can be collected. */
+  public abstract boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEndReason endReason);
 
   public abstract Object deallocateContainer(ContainerId containerId);
 

http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
index 44f5484..1fc9ac2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.RackResolver;
 import org.apache.hadoop.yarn.util.resource.Resources;
+import org.apache.tez.dag.api.TaskAttemptEndReason;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.app.AppContext;
@@ -987,10 +988,13 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
    *          the task to de-allocate.
    * @param taskSucceeded
    *          specify whether the task succeeded or failed.
+   * @param endReason
+   *          reason for the task ending
    * @return true if a container is assigned to this task.
    */
   @Override
-  public boolean deallocateTask(Object task, boolean taskSucceeded) {
+  public boolean deallocateTask(Object task, boolean taskSucceeded,
+                                TaskAttemptEndReason endReason) {
     Map<CookieContainerRequest, Container> assignedContainers = null;
 
     synchronized (this) {
@@ -1180,7 +1184,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
             CookieContainerRequest request = entry.getValue();
             if (request.getPriority().equals(lowestPriNewContainer.getPriority())) {
               LOG.info("Resending request for task again: " + task);
-              deallocateTask(task, true);
+              deallocateTask(task, true, null);
               allocateTask(task, request.getCapability(), 
                   (request.getNodes() == null ? null : 
                     request.getNodes().toArray(new String[request.getNodes().size()])), 

http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
index 0cf1959..076f9e0 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
@@ -76,6 +76,7 @@ import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
 @SuppressWarnings("unchecked")
+// TODO TEZ-2003 Rename to TestTezTaskCommunicator
 public class TestTaskAttemptListenerImplTezDag {
   private ApplicationId appId;
   private AppContext appContext;

http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java
new file mode 100644
index 0000000..934543f
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.dag.api.TaskAttemptEndReason;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptKilled;
+import org.apache.tez.dag.app.rm.container.AMContainer;
+import org.apache.tez.dag.app.rm.container.AMContainerMap;
+import org.apache.tez.dag.app.rm.container.AMContainerTask;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+// TODO TEZ-2003. Rename to TestTaskAttemptListener | whatever TaskAttemptListener is renamed to.
+public class TestTaskAttemptListenerImplTezDag2 {
+
+  @Test(timeout = 5000)
+  public void testTaskAttemptFailedKilled() {
+    ApplicationId appId = ApplicationId.newInstance(1000, 1);
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
+    Credentials credentials = new Credentials();
+    AppContext appContext = mock(AppContext.class);
+    EventHandler eventHandler = mock(EventHandler.class);
+    DAG dag = mock(DAG.class);
+    AMContainerMap amContainerMap = mock(AMContainerMap.class);
+    Map<ApplicationAccessType, String> appAcls = new HashMap<ApplicationAccessType, String>();
+    doReturn(eventHandler).when(appContext).getEventHandler();
+    doReturn(dag).when(appContext).getCurrentDAG();
+    doReturn(appAttemptId).when(appContext).getApplicationAttemptId();
+    doReturn(credentials).when(appContext).getAppCredentials();
+    doReturn(appAcls).when(appContext).getApplicationACLs();
+    doReturn(amContainerMap).when(appContext).getAllContainers();
+    NodeId nodeId = NodeId.newInstance("localhost", 0);
+    AMContainer amContainer = mock(AMContainer.class);
+    Container container = mock(Container.class);
+    doReturn(nodeId).when(container).getNodeId();
+    doReturn(amContainer).when(amContainerMap).get(any(ContainerId.class));
+    doReturn(container).when(amContainer).getContainer();
+
+    TaskAttemptListenerImpTezDag taskAttemptListener =
+        new TaskAttemptListenerImpTezDag(appContext, mock(TaskHeartbeatHandler.class),
+            mock(ContainerHeartbeatHandler.class), null, null, false);
+
+    TaskSpec taskSpec1 = mock(TaskSpec.class);
+    TezTaskAttemptID taskAttemptId1 = mock(TezTaskAttemptID.class);
+    doReturn(taskAttemptId1).when(taskSpec1).getTaskAttemptID();
+    AMContainerTask amContainerTask1 = new AMContainerTask(taskSpec1, null, null, false, 10);
+
+    TaskSpec taskSpec2 = mock(TaskSpec.class);
+    TezTaskAttemptID taskAttemptId2 = mock(TezTaskAttemptID.class);
+    doReturn(taskAttemptId2).when(taskSpec2).getTaskAttemptID();
+    AMContainerTask amContainerTask2 = new AMContainerTask(taskSpec2, null, null, false, 10);
+
+    ContainerId containerId1 = createContainerId(appId, 1);
+    taskAttemptListener.registerRunningContainer(containerId1, 0);
+    taskAttemptListener.registerTaskAttempt(amContainerTask1, containerId1, 0);
+    ContainerId containerId2 = createContainerId(appId, 2);
+    taskAttemptListener.registerRunningContainer(containerId2, 0);
+    taskAttemptListener.registerTaskAttempt(amContainerTask2, containerId2, 0);
+
+
+    taskAttemptListener
+        .taskFailed(taskAttemptId1, TaskAttemptEndReason.COMMUNICATION_ERROR, "Diagnostics1");
+    taskAttemptListener
+        .taskKilled(taskAttemptId2, TaskAttemptEndReason.SERVICE_BUSY, "Diagnostics2");
+
+    ArgumentCaptor<Event> argumentCaptor = ArgumentCaptor.forClass(Event.class);
+    verify(eventHandler, times(2)).handle(argumentCaptor.capture());
+    assertTrue(argumentCaptor.getAllValues().get(0) instanceof TaskAttemptEventAttemptFailed);
+    assertTrue(argumentCaptor.getAllValues().get(1) instanceof TaskAttemptEventAttemptKilled);
+    TaskAttemptEventAttemptFailed failedEvent =
+        (TaskAttemptEventAttemptFailed) argumentCaptor.getAllValues().get(0);
+    TaskAttemptEventAttemptKilled killedEvent =
+        (TaskAttemptEventAttemptKilled) argumentCaptor.getAllValues().get(1);
+
+    assertEquals("Diagnostics1", failedEvent.getDiagnosticInfo());
+    assertEquals(TaskAttemptTerminationCause.COMMUNICATION_ERROR,
+        failedEvent.getTerminationCause());
+
+    assertEquals("Diagnostics2", killedEvent.getDiagnosticInfo());
+    assertEquals(TaskAttemptTerminationCause.SERVICE_BUSY, killedEvent.getTerminationCause());
+    // TODO TEZ-2003. Verify unregistration from the registered list
+  }
+
+  private ContainerId createContainerId(ApplicationId applicationId, int containerIdx) {
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(applicationId, 1);
+    ContainerId containerId = ContainerId.newInstance(appAttemptId, containerIdx);
+    return containerId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index c1169ef..d45346a 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNull;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
@@ -57,6 +58,7 @@ import org.apache.tez.common.MockDNSToSwitchMapping;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TaskAttemptEndReason;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TaskLocationHint;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
@@ -92,6 +94,7 @@ import org.apache.tez.runtime.api.impl.OutputSpec;
 import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.internal.matchers.Null;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -223,9 +226,9 @@ public class TestContainerReuse {
 
     taskSchedulerEventHandler.handleEvent(
       new AMSchedulerEventTAEnded(
-        ta11, containerHost1.getId(), TaskAttemptState.SUCCEEDED, 0));
+        ta11, containerHost1.getId(), TaskAttemptState.SUCCEEDED, null, 0));
     drainableAppCallback.drain();
-    verify(taskScheduler).deallocateTask(eq(ta11), eq(true));
+    verify(taskScheduler).deallocateTask(eq(ta11), eq(true), eq((TaskAttemptEndReason)null));
     verify(taskSchedulerEventHandler, times(1)).taskAllocated(
       eq(ta31), any(Object.class), eq(containerHost1));
     verify(rmClient, times(0)).releaseAssignedContainer(
@@ -235,7 +238,7 @@ public class TestContainerReuse {
 
     taskSchedulerEventHandler.handleEvent(
       new AMSchedulerEventTAEnded(ta21, containerHost2.getId(),
-        TaskAttemptState.SUCCEEDED, 0));
+        TaskAttemptState.SUCCEEDED, null, 0));
 
     long currentTs = System.currentTimeMillis();
     Throwable exception = null;
@@ -356,9 +359,9 @@ public class TestContainerReuse {
 
     taskSchedulerEventHandler.handleEvent(
       new AMSchedulerEventTAEnded(ta21, containerHost2.getId(),
-        TaskAttemptState.SUCCEEDED, 0));
+        TaskAttemptState.SUCCEEDED, null, 0));
     drainableAppCallback.drain();
-    verify(taskScheduler).deallocateTask(eq(ta21), eq(true));
+    verify(taskScheduler).deallocateTask(eq(ta21), eq(true), eq((TaskAttemptEndReason)null));
     verify(taskSchedulerEventHandler, times(0)).taskAllocated(
       eq(ta31), any(Object.class), eq(containerHost2));
     verify(rmClient, times(1)).releaseAssignedContainer(
@@ -459,9 +462,9 @@ public class TestContainerReuse {
     verify(taskSchedulerEventHandler).taskAllocated(eq(ta11), any(Object.class), eq(container1));
 
     // Task assigned to container completed successfully. Container should be re-used.
-    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED, 0));
+    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0));
     drainableAppCallback.drain();
-    verify(taskScheduler).deallocateTask(eq(ta11), eq(true));
+    verify(taskScheduler).deallocateTask(eq(ta11), eq(true), eq((TaskAttemptEndReason)null));
     verify(taskSchedulerEventHandler).taskAllocated(eq(ta12), any(Object.class), eq(container1));
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
     eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
@@ -469,19 +472,19 @@ public class TestContainerReuse {
 
     // Task assigned to container completed successfully.
     // Verify reuse across hosts.
-    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta12, container1.getId(), TaskAttemptState.SUCCEEDED, 0));
+    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta12, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0));
     drainableAppCallback.drain();
-    verify(taskScheduler).deallocateTask(eq(ta12), eq(true));
+    verify(taskScheduler).deallocateTask(eq(ta12), eq(true), eq((TaskAttemptEndReason)null));
     verify(taskSchedulerEventHandler).taskAllocated(eq(ta13), any(Object.class), eq(container1));
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
     eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
     eventHandler.reset();
 
     // Verify no re-use if a previous task fails.
-    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta13, container1.getId(), TaskAttemptState.FAILED, 0));
+    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta13, container1.getId(), TaskAttemptState.FAILED, null, 0));
     drainableAppCallback.drain();
     verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(ta14), any(Object.class), eq(container1));
-    verify(taskScheduler).deallocateTask(eq(ta13), eq(false));
+    verify(taskScheduler).deallocateTask(eq(ta13), eq(false), eq((TaskAttemptEndReason)null));
     verify(rmClient).releaseAssignedContainer(eq(container1.getId()));
     eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
     eventHandler.reset();
@@ -496,9 +499,9 @@ public class TestContainerReuse {
     verify(taskSchedulerEventHandler).taskAllocated(eq(ta14), any(Object.class), eq(container2));
 
     // Task assigned to container completed successfully. No pending requests. Container should be released.
-    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta14, container2.getId(), TaskAttemptState.SUCCEEDED, 0));
+    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta14, container2.getId(), TaskAttemptState.SUCCEEDED, null, 0));
     drainableAppCallback.drain();
-    verify(taskScheduler).deallocateTask(eq(ta14), eq(true));
+    verify(taskScheduler).deallocateTask(eq(ta14), eq(true), eq((TaskAttemptEndReason)null));
     verify(rmClient).releaseAssignedContainer(eq(container2.getId()));
     eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
     eventHandler.reset();
@@ -607,9 +610,9 @@ public class TestContainerReuse {
 
     // First task had profiling on. This container can not be reused further.
     taskSchedulerEventHandler.handleEvent(
-        new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED, 0));
+        new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0));
     drainableAppCallback.drain();
-    verify(taskScheduler).deallocateTask(eq(ta11), eq(true));
+    verify(taskScheduler).deallocateTask(eq(ta11), eq(true), eq((TaskAttemptEndReason)null));
     verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(ta12), any(Object.class),
         eq(container1));
     verify(rmClient, times(1)).releaseAssignedContainer(eq(container1.getId()));
@@ -653,9 +656,9 @@ public class TestContainerReuse {
 
     // Verify that the container can not be reused when profiling option is turned on
     // Even for 2 tasks having same profiling option can have container reusability.
-    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta13, container2.getId(), TaskAttemptState.SUCCEEDED, 0));
+    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta13, container2.getId(), TaskAttemptState.SUCCEEDED, null, 0));
     drainableAppCallback.drain();
-    verify(taskScheduler).deallocateTask(eq(ta13), eq(true));
+    verify(taskScheduler).deallocateTask(eq(ta13), eq(true), eq((TaskAttemptEndReason)null));
     verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(ta14), any(Object.class),
         eq(container2));
     verify(rmClient, times(1)).releaseAssignedContainer(eq(container2.getId()));
@@ -698,9 +701,9 @@ public class TestContainerReuse {
     verify(taskSchedulerEventHandler).taskAllocated(eq(ta15), any(Object.class), eq(container3));
 
     //Ensure task 6 (of vertex 1) is allocated to same container
-    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta15, container3.getId(), TaskAttemptState.SUCCEEDED, 0));
+    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta15, container3.getId(), TaskAttemptState.SUCCEEDED, null, 0));
     drainableAppCallback.drain();
-    verify(taskScheduler).deallocateTask(eq(ta15), eq(true));
+    verify(taskScheduler).deallocateTask(eq(ta15), eq(true), eq((TaskAttemptEndReason)null));
     verify(taskSchedulerEventHandler).taskAllocated(eq(ta16), any(Object.class), eq(container3));
     eventHandler.reset();
 
@@ -811,9 +814,9 @@ public class TestContainerReuse {
     // until delay expires.
     taskSchedulerEventHandler.handleEvent(
       new AMSchedulerEventTAEnded(ta11, container1.getId(),
-        TaskAttemptState.SUCCEEDED, 0));
+        TaskAttemptState.SUCCEEDED, null, 0));
     drainableAppCallback.drain();
-    verify(taskScheduler).deallocateTask(eq(ta11), eq(true));
+    verify(taskScheduler).deallocateTask(eq(ta11), eq(true), eq((TaskAttemptEndReason)null));
     verify(taskSchedulerEventHandler, times(0)).taskAllocated(
       eq(ta12), any(Object.class), eq(container1));
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
@@ -828,7 +831,7 @@ public class TestContainerReuse {
     // TA12 completed.
     taskSchedulerEventHandler.handleEvent(
       new AMSchedulerEventTAEnded(ta12, container1.getId(),
-        TaskAttemptState.SUCCEEDED, 0));
+        TaskAttemptState.SUCCEEDED, null, 0));
     drainableAppCallback.drain();
     LOG.info("Sleeping to ensure that the scheduling loop runs");
     Thread.sleep(3000l);
@@ -946,9 +949,9 @@ public class TestContainerReuse {
     // Container should  be assigned to task21.
     taskSchedulerEventHandler.handleEvent(
       new AMSchedulerEventTAEnded(ta11, container1.getId(),
-        TaskAttemptState.SUCCEEDED, 0));
+        TaskAttemptState.SUCCEEDED, null, 0));
     drainableAppCallback.drain();
-    verify(taskScheduler).deallocateTask(eq(ta11), eq(true));
+    verify(taskScheduler).deallocateTask(eq(ta11), eq(true), eq((TaskAttemptEndReason)null));
     verify(taskSchedulerEventHandler).taskAllocated(
       eq(ta21), any(Object.class), eq(container1));
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
@@ -956,7 +959,7 @@ public class TestContainerReuse {
     // Task 2 completes.
     taskSchedulerEventHandler.handleEvent(
       new AMSchedulerEventTAEnded(ta21, container1.getId(),
-        TaskAttemptState.SUCCEEDED, 0));
+        TaskAttemptState.SUCCEEDED, null, 0));
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
 
     LOG.info("Sleeping to ensure that the scheduling loop runs");
@@ -1065,9 +1068,9 @@ public class TestContainerReuse {
     assertEquals(1, assignEvent.getRemoteTaskLocalResources().size());
     
     // Task assigned to container completed successfully. Container should be re-used.
-    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta111, container1.getId(), TaskAttemptState.SUCCEEDED, 0));
+    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta111, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0));
     drainableAppCallback.drain();
-    verify(taskScheduler).deallocateTask(eq(ta111), eq(true));
+    verify(taskScheduler).deallocateTask(eq(ta111), eq(true), eq((TaskAttemptEndReason)null));
     verify(taskSchedulerEventHandler).taskAllocated(eq(ta112), any(Object.class), eq(container1));
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
     eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
@@ -1077,9 +1080,9 @@ public class TestContainerReuse {
 
     // Task assigned to container completed successfully.
     // Verify reuse across hosts.
-    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta112, container1.getId(), TaskAttemptState.SUCCEEDED, 0));
+    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta112, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0));
     drainableAppCallback.drain();
-    verify(taskScheduler).deallocateTask(eq(ta112), eq(true));
+    verify(taskScheduler).deallocateTask(eq(ta112), eq(true), eq((TaskAttemptEndReason)null));
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
     eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
     eventHandler.reset();
@@ -1118,9 +1121,9 @@ public class TestContainerReuse {
     assertEquals(2, assignEvent.getRemoteTaskLocalResources().size());
     eventHandler.reset();
 
-    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta211, container1.getId(), TaskAttemptState.SUCCEEDED, 0));
+    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta211, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0));
     drainableAppCallback.drain();
-    verify(taskScheduler).deallocateTask(eq(ta211), eq(true));
+    verify(taskScheduler).deallocateTask(eq(ta211), eq(true), eq((TaskAttemptEndReason)null));
     verify(taskSchedulerEventHandler).taskAllocated(eq(ta212), any(Object.class), eq(container1));
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
     eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);

http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
index 25cf4b5..0a642bb 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.dag.api.TaskAttemptEndReason;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback;
@@ -94,7 +95,7 @@ public class TestLocalTaskSchedulerService {
 
     Task task = mock(Task.class);
     taskSchedulerService.allocateTask(task, Resource.newInstance(1024, 1), null, null, Priority.newInstance(1), null, null);
-    taskSchedulerService.deallocateTask(task, false);
+    taskSchedulerService.deallocateTask(task, false, null);
     // start the RequestHandler, DeallocateTaskRequest has higher priority, so will be processed first
     taskSchedulerService.startRequestHandlerThread();
 
@@ -126,7 +127,7 @@ public class TestLocalTaskSchedulerService {
 
     MockAsyncDelegateRequestHandler requestHandler = taskSchedulerService.getRequestHandler();
     requestHandler.drainRequest(1);
-    taskSchedulerService.deallocateTask(task, false);
+    taskSchedulerService.deallocateTask(task, false, null);
     requestHandler.drainRequest(2);
     assertEquals(1, requestHandler.deallocateCount);
     assertEquals(1, requestHandler.allocateCount);

http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index dabae67..807e772 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -172,7 +172,7 @@ public class TestTaskScheduler {
                            addContainerRequest((CookieContainerRequest) any());
 
     // returned from task requests before allocation happens
-    assertFalse(scheduler.deallocateTask(mockTask1, true));
+    assertFalse(scheduler.deallocateTask(mockTask1, true, null));
     verify(mockApp, times(0)).containerBeingReleased(any(ContainerId.class));
     verify(mockRMClient, times(1)).
                         removeContainerRequest((CookieContainerRequest) any());
@@ -180,7 +180,7 @@ public class TestTaskScheduler {
                                  releaseAssignedContainer((ContainerId) any());
 
     // deallocating unknown task
-    assertFalse(scheduler.deallocateTask(mockTask1, true));
+    assertFalse(scheduler.deallocateTask(mockTask1, true, null));
     verify(mockApp, times(0)).containerBeingReleased(any(ContainerId.class));
     verify(mockRMClient, times(1)).
                         removeContainerRequest((CookieContainerRequest) any());
@@ -325,7 +325,7 @@ public class TestTaskScheduler {
     verify(mockRMClient).releaseAssignedContainer(mockCId4);
 
     // deallocate allocated task
-    assertTrue(scheduler.deallocateTask(mockTask1, true));
+    assertTrue(scheduler.deallocateTask(mockTask1, true, null));
     drainableAppCallback.drain();
     verify(mockApp).containerBeingReleased(mockCId1);
     verify(mockRMClient).releaseAssignedContainer(mockCId1);
@@ -445,7 +445,7 @@ public class TestTaskScheduler {
     verify(mockApp, times(4)).taskAllocated(any(), any(), (Container) any());
     verify(mockApp).taskAllocated(mockTask4, mockCookie4, mockContainer6);
     // deallocate allocated task
-    assertTrue(scheduler.deallocateTask(mockTask4, true));
+    assertTrue(scheduler.deallocateTask(mockTask4, true, null));
     drainableAppCallback.drain();
     verify(mockApp).containerBeingReleased(mockCId6);
     verify(mockRMClient).releaseAssignedContainer(mockCId6);
@@ -475,7 +475,7 @@ public class TestTaskScheduler {
         removeContainerRequest((CookieContainerRequest) any());
     verify(mockRMClient, times(8)).addContainerRequest(
         (CookieContainerRequest) any());
-    assertFalse(scheduler.deallocateTask(mockTask1, true));
+    assertFalse(scheduler.deallocateTask(mockTask1, true, null));
 
     List<NodeReport> mockUpdatedNodes = mock(List.class);
     scheduler.onNodesUpdated(mockUpdatedNodes);
@@ -741,7 +741,7 @@ public class TestTaskScheduler {
     verify(mockRMClient).releaseAssignedContainer(mockCId4);
 
     // deallocate allocated task
-    assertTrue(scheduler.deallocateTask(mockTask1, true));
+    assertTrue(scheduler.deallocateTask(mockTask1, true, null));
     drainableAppCallback.drain();
     verify(mockApp).containerBeingReleased(mockCId1);
     verify(mockRMClient).releaseAssignedContainer(mockCId1);
@@ -871,7 +871,7 @@ public class TestTaskScheduler {
     verify(mockApp, times(4)).taskAllocated(any(), any(), (Container) any());
     verify(mockApp).taskAllocated(mockTask4, mockCookie4, mockContainer6);
     // deallocate allocated task
-    assertTrue(scheduler.deallocateTask(mockTask4, true));
+    assertTrue(scheduler.deallocateTask(mockTask4, true, null));
     drainableAppCallback.drain();
     verify(mockApp).containerBeingReleased(mockCId6);
     verify(mockRMClient).releaseAssignedContainer(mockCId6);
@@ -960,8 +960,8 @@ public class TestTaskScheduler {
     // container7 allocated to the task with affinity for it
     verify(mockApp).taskAllocated(mockTask6, mockCookie6, mockContainer7);
     // deallocate allocated task
-    assertTrue(scheduler.deallocateTask(mockTask5, true));
-    assertTrue(scheduler.deallocateTask(mockTask6, true));
+    assertTrue(scheduler.deallocateTask(mockTask5, true, null));
+    assertTrue(scheduler.deallocateTask(mockTask6, true, null));
     drainableAppCallback.drain();
     verify(mockApp).containerBeingReleased(mockCId7);
     verify(mockApp).containerBeingReleased(mockCId8);

http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
index 5657f86..872d592 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.dag.api.TaskAttemptEndReason;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.service.TezTestServiceConfConstants;
 
@@ -198,7 +199,7 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService {
   }
 
   @Override
-  public boolean deallocateTask(Object task, boolean taskSucceeded) {
+  public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEndReason endReason) {
     ContainerId containerId = runningTasks.remove(task);
     if (containerId == null) {
       LOG.error("Could not determine ContainerId for task: " + task +

http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
index a327caf..e3385a2 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
@@ -19,16 +19,20 @@ import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.RejectedExecutionException;
 
 import com.google.protobuf.ByteString;
+import com.google.protobuf.ServiceException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.dag.api.TaskAttemptEndReason;
 import org.apache.tez.dag.api.TaskCommunicatorContext;
 import org.apache.tez.dag.app.TezTaskCommunicatorImpl;
 import org.apache.tez.dag.app.TezTestServiceCommunicator;
@@ -83,6 +87,7 @@ public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl
   @Override
   public void serviceStop() {
     super.serviceStop();
+    this.communicator.stop();
   }
 
 
@@ -123,13 +128,15 @@ public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl
       throw new RuntimeException("ContainerInfo not found for container: " + containerId +
           ", while trying to launch task: " + taskSpec.getTaskAttemptID());
     }
+    // Have to register this up front right now. Otherwise, it's possible for the task to start
+    // sending out status/DONE/KILLED/FAILED messages before TAImpl knows how to handle them.
+    getTaskCommunicatorContext()
+        .taskStartedRemotely(taskSpec.getTaskAttemptID(), containerId);
     communicator.submitWork(requestProto, host, port,
         new TezTestServiceCommunicator.ExecuteRequestCallback<SubmitWorkResponseProto>() {
           @Override
           public void setResponse(SubmitWorkResponseProto response) {
             LOG.info("Successfully launched task: " + taskSpec.getTaskAttemptID());
-            getTaskCommunicatorContext()
-                .taskStartedRemotely(taskSpec.getTaskAttemptID(), containerId);
           }
 
           @Override
@@ -137,6 +144,31 @@ public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl
             // TODO Handle this error. This is where an API on the context to indicate failure / rejection comes in.
             LOG.info("Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " +
                 containerId, t);
+            if (t instanceof ServiceException) {
+              ServiceException se = (ServiceException) t;
+              t = se.getCause();
+            }
+            if (t instanceof RemoteException) {
+              RemoteException re = (RemoteException)t;
+              String message = re.toString();
+              if (message.contains(RejectedExecutionException.class.getName())) {
+                getTaskCommunicatorContext().taskKilled(taskSpec.getTaskAttemptID(),
+                    TaskAttemptEndReason.SERVICE_BUSY, "Service Busy");
+              } else {
+                getTaskCommunicatorContext()
+                    .taskFailed(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.OTHER,
+                        t.toString());
+              }
+            } else {
+              if (t instanceof IOException) {
+                getTaskCommunicatorContext().taskKilled(taskSpec.getTaskAttemptID(),
+                    TaskAttemptEndReason.COMMUNICATION_ERROR, "Communication Error");
+              } else {
+                getTaskCommunicatorContext()
+                    .taskFailed(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.OTHER,
+                        t.getMessage());
+              }
+            }
           }
         });
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-ext-service-tests/src/test/java/org/apache/tez/service/ContainerRunner.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/ContainerRunner.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/ContainerRunner.java
index 2bca4ed..28c2286 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/service/ContainerRunner.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/ContainerRunner.java
@@ -16,12 +16,13 @@ package org.apache.tez.service;
 
 import java.io.IOException;
 
+import org.apache.tez.dag.api.TezException;
 import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos;
 import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainerRequestProto;
 import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.SubmitWorkRequestProto;
 
 public interface ContainerRunner {
 
-  void queueContainer(RunContainerRequestProto request) throws IOException;
-  void submitWork(SubmitWorkRequestProto request) throws IOException;
+  void queueContainer(RunContainerRequestProto request) throws TezException;
+  void submitWork(SubmitWorkRequestProto request) throws TezException;
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-ext-service-tests/src/test/java/org/apache/tez/service/MiniTezTestServiceCluster.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/MiniTezTestServiceCluster.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/MiniTezTestServiceCluster.java
index f47bd67..0ac0b33 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/service/MiniTezTestServiceCluster.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/MiniTezTestServiceCluster.java
@@ -133,7 +133,10 @@ public class MiniTezTestServiceCluster extends AbstractService {
 
   @Override
   public void serviceStop() {
-    tezTestService.stop();
+    if (tezTestService != null) {
+      tezTestService.stop();
+      tezTestService = null;
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
index 25d6030..379d952 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -58,6 +59,7 @@ import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.TokenCache;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
+import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.task.TaskReporter;
 import org.apache.tez.runtime.task.TezTaskRunner;
 import org.apache.tez.service.ContainerRunner;
@@ -68,14 +70,18 @@ import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
 import org.apache.tez.runtime.task.TezChild;
 import org.apache.tez.runtime.task.TezChild.ContainerExecutionResult;
 import org.apache.tez.shufflehandler.ShuffleHandler;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos;
 import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainerRequestProto;
 import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.SubmitWorkRequestProto;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.TaskSpecProto;
 import org.apache.tez.util.ProtoConverters;
 
 public class ContainerRunnerImpl extends AbstractService implements ContainerRunner {
 
   private static final Logger LOG = Logger.getLogger(ContainerRunnerImpl.class);
 
+  public static final String DAG_NAME_INSTRUMENTED_FAILURES = "InstrumentedFailures";
+
   private final ListeningExecutorService executorService;
   private final AtomicReference<InetSocketAddress> localAddress;
   private final String[] localDirsBase;
@@ -146,10 +152,10 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
    * Submit a container which is ready for running.
    * The regular pull mechanism will be used to fetch work from the AM
    * @param request
-   * @throws IOException
+   * @throws TezException
    */
   @Override
-  public void queueContainer(RunContainerRequestProto request) throws IOException {
+  public void queueContainer(RunContainerRequestProto request) throws TezException {
     LOG.info("Queuing container for execution: " + request);
 
     Map<String, String> env = new HashMap<String, String>();
@@ -162,7 +168,11 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
     for (int i = 0; i < localDirsBase.length; i++) {
       localDirs[i] = createAppSpecificLocalDir(localDirsBase[i], request.getApplicationIdString(),
           request.getUser());
-      localFs.mkdirs(new Path(localDirs[i]));
+      try {
+        localFs.mkdirs(new Path(localDirs[i]));
+      } catch (IOException e) {
+        throw new TezException(e);
+      }
     }
     LOG.info("DEBUG: Dirs are: " + Arrays.toString(localDirs));
 
@@ -175,7 +185,11 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
     DataInputBuffer dib = new DataInputBuffer();
     byte[] tokenBytes = request.getCredentialsBinary().toByteArray();
     dib.reset(tokenBytes, tokenBytes.length);
-    credentials.readTokenStorageStream(dib);
+    try {
+      credentials.readTokenStorageStream(dib);
+    } catch (IOException e) {
+      throw new TezException(e);
+    }
 
     Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials);
 
@@ -197,13 +211,14 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
    * This is intended for a task push from the AM
    *
    * @param request
-   * @throws IOException
+   * @throws org.apache.tez.dag.api.TezException
    */
   @Override
-  public void submitWork(SubmitWorkRequestProto request) throws
-      IOException {
+  public void submitWork(SubmitWorkRequestProto request) throws TezException {
     LOG.info("Queuing work for execution: " + request);
 
+    checkAndThrowExceptionForTests(request);
+
     Map<String, String> env = new HashMap<String, String>();
     env.putAll(localEnv);
     env.put(ApplicationConstants.Environment.USER.name(), request.getUser());
@@ -214,7 +229,11 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
     for (int i = 0; i < localDirsBase.length; i++) {
       localDirs[i] = createAppSpecificLocalDir(localDirsBase[i], request.getApplicationIdString(),
           request.getUser());
-      localFs.mkdirs(new Path(localDirs[i]));
+      try {
+        localFs.mkdirs(new Path(localDirs[i]));
+      } catch (IOException e) {
+        throw new TezException(e);
+      }
     }
     if (LOG.isDebugEnabled()) {
       LOG.debug("Dirs are: " + Arrays.toString(localDirs));
@@ -228,7 +247,11 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
     DataInputBuffer dib = new DataInputBuffer();
     byte[] tokenBytes = request.getCredentialsBinary().toByteArray();
     dib.reset(tokenBytes, tokenBytes.length);
-    credentials.readTokenStorageStream(dib);
+    try {
+      credentials.readTokenStorageStream(dib);
+    } catch (IOException e) {
+      throw new TezException(e);
+    }
 
     Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials);
 
@@ -509,4 +532,23 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
     }
   }
 
+
+  private void checkAndThrowExceptionForTests(SubmitWorkRequestProto request) throws TezException {
+    if (!request.getTaskSpec().getDagName().equals(DAG_NAME_INSTRUMENTED_FAILURES)) {
+      return;
+    }
+
+    TaskSpec taskSpec = ProtoConverters.getTaskSpecfromProto(request.getTaskSpec());
+    if (taskSpec.getTaskAttemptID().getTaskID().getId() == 0 &&
+        taskSpec.getTaskAttemptID().getId() == 0) {
+      LOG.info("Simulating Rejected work");
+      throw new RejectedExecutionException(
+          "Simulating Rejected work for taskAttemptId=" + taskSpec.getTaskAttemptID());
+    } else if (taskSpec.getTaskAttemptID().getTaskID().getId() == 1 &&
+        taskSpec.getTaskAttemptID().getId() == 0) {
+      LOG.info("Simulating Task Setup Failure during launch");
+      throw new TezException("Simulating Task Setup Failure during launch for taskAttemptId=" +
+          taskSpec.getTaskAttemptID());
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java
index 012e352..855f1b0 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java
@@ -14,7 +14,6 @@
 
 package org.apache.tez.service.impl;
 
-import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Arrays;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -25,6 +24,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.log4j.Logger;
+import org.apache.tez.dag.api.TezException;
 import org.apache.tez.service.ContainerRunner;
 import org.apache.tez.shufflehandler.ShuffleHandler;
 import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos;
@@ -108,14 +108,14 @@ public class TezTestService extends AbstractService implements ContainerRunner {
 
 
   @Override
-  public void queueContainer(RunContainerRequestProto request) throws IOException {
+  public void queueContainer(RunContainerRequestProto request) throws TezException {
     numSubmissions.incrementAndGet();
     containerRunner.queueContainer(request);
   }
 
   @Override
   public void submitWork(TezTestServiceProtocolProtos.SubmitWorkRequestProto request) throws
-      IOException {
+      TezException {
     numSubmissions.incrementAndGet();
     containerRunner.submitWork(request);
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestServiceProtocolServerImpl.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestServiceProtocolServerImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestServiceProtocolServerImpl.java
index d7f8444..39d7156 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestServiceProtocolServerImpl.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestServiceProtocolServerImpl.java
@@ -30,11 +30,13 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.tez.dag.api.TezException;
 import org.apache.tez.service.ContainerRunner;
 import org.apache.tez.service.TezTestServiceProtocolBlockingPB;
 import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos;
 import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainerRequestProto;
 import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainerResponseProto;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.SubmitWorkRequestProto;
 import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.SubmitWorkResponseProto;
 
 public class TezTestServiceProtocolServerImpl extends AbstractService
@@ -61,20 +63,20 @@ public class TezTestServiceProtocolServerImpl extends AbstractService
     LOG.info("Received request: " + request);
     try {
       containerRunner.queueContainer(request);
-    } catch (IOException e) {
+    } catch (TezException e) {
       throw new ServiceException(e);
     }
     return RunContainerResponseProto.getDefaultInstance();
   }
 
   @Override
-  public SubmitWorkResponseProto submitWork(RpcController controller, TezTestServiceProtocolProtos.SubmitWorkRequestProto request) throws
+  public SubmitWorkResponseProto submitWork(RpcController controller, SubmitWorkRequestProto request) throws
       ServiceException {
     LOG.info("Received submitWork request: " + request);
     try {
       containerRunner.submitWork(request);
-    } catch (IOException e) {
-      e.printStackTrace();
+    } catch (TezException e) {
+      throw new ServiceException(e);
     }
     return SubmitWorkResponseProto.getDefaultInstance();
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
index 0ec972b..b6a166d 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
@@ -27,16 +27,23 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.app.launcher.TezTestServiceNoOpContainerLauncher;
 import org.apache.tez.dag.app.rm.TezTestServiceTaskSchedulerService;
 import org.apache.tez.dag.app.taskcomm.TezTestServiceTaskCommunicatorImpl;
 import org.apache.tez.examples.HashJoinExample;
 import org.apache.tez.examples.JoinDataGen;
 import org.apache.tez.examples.JoinValidateConfigured;
+import org.apache.tez.runtime.library.processor.SleepProcessor;
 import org.apache.tez.service.MiniTezTestServiceCluster;
+import org.apache.tez.service.impl.ContainerRunnerImpl;
 import org.apache.tez.test.MiniTezCluster;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -283,6 +290,28 @@ public class TestExternalTezServices {
         PROPS_IN_AM, PROPS_REGULAR_CONTAINERS);
   }
 
+  @Test(timeout = 60000)
+  public void testErrorPropagation() throws TezException, InterruptedException, IOException {
+    runExceptionSimulation();
+  }
+
+
+
+  private void runExceptionSimulation() throws IOException, TezException, InterruptedException {
+    DAG dag = DAG.create(ContainerRunnerImpl.DAG_NAME_INSTRUMENTED_FAILURES);
+    Vertex v =Vertex.create("Vertex1", ProcessorDescriptor.create(SleepProcessor.class.getName()),
+        3);
+    for (Map.Entry<String, String> prop : PROPS_EXT_SERVICE_PUSH.entrySet()) {
+      v.setConf(prop.getKey(), prop.getValue());
+    }
+    dag.addVertex(v);
+    DAGClient dagClient = sharedTezClient.submitDAG(dag);
+    DAGStatus dagStatus = dagClient.waitForCompletion();
+    assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
+    assertEquals(1, dagStatus.getDAGProgress().getFailedTaskAttemptCount());
+    assertEquals(1, dagStatus.getDAGProgress().getKilledTaskAttemptCount());
+
+  }
 
   private void runJoinValidate(String name, int extExpectedCount, Map<String, String> lhsProps,
                                Map<String, String> rhsProps,