You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2016/02/17 22:39:44 UTC

[2/2] tez git commit: TEZ-3029. Add an onError method to service plugin contexts. (sseth)

TEZ-3029. Add an onError method to service plugin contexts. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a812c346
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a812c346
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a812c346

Branch: refs/heads/master
Commit: a812c3462808e73b8a59e1852ff2547dcbafbf84
Parents: fec46aa
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Feb 17 13:39:11 2016 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Feb 17 13:39:11 2016 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../api/ContainerLauncherContext.java           |  12 +-
 .../apache/tez/serviceplugins/api/DagInfo.java  |  30 +++
 .../api/ServicePluginContextBase.java           |  49 ++++
 .../serviceplugins/api/ServicePluginError.java  |  48 ++++
 .../api/ServicePluginErrorDefaults.java         |  76 ++++++
 .../api/TaskSchedulerContext.java               |  19 +-
 tez-dag/src/main/java/org/apache/tez/Utils.java |  33 +++
 .../tez/dag/api/client/DAGClientHandler.java    |   5 +-
 .../dag/app/ContainerLauncherContextImpl.java   |  27 ++-
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  24 +-
 .../dag/app/TaskCommunicatorContextImpl.java    |  17 ++
 .../tez/dag/app/TaskCommunicatorManager.java    |  26 ++
 .../app/TaskCommunicatorManagerInterface.java   |   4 +
 .../java/org/apache/tez/dag/app/dag/DAG.java    |   3 +-
 .../tez/dag/app/dag/DAGTerminationCause.java    |   3 +
 .../tez/dag/app/dag/VertexTerminationCause.java |   2 +-
 ...DAGAppMasterEventSchedulingServiceError.java |  15 +-
 .../dag/app/dag/event/DAGEventTerminateDag.java |  38 +++
 .../tez/dag/app/dag/event/DAGEventType.java     |   4 +-
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |  82 ++++---
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |   2 +-
 .../app/launcher/ContainerLauncherManager.java  |  59 ++++-
 .../dag/app/rm/TaskSchedulerContextImpl.java    |  22 +-
 .../app/rm/TaskSchedulerContextImplWrapper.java |  33 ++-
 .../tez/dag/app/rm/TaskSchedulerManager.java    |  36 ++-
 .../dag/app/rm/YarnTaskSchedulerService.java    |   6 +-
 .../app/rm/YarnTaskSchedulerServiceError.java   |  33 +++
 .../api/TaskCommunicatorContext.java            |  16 +-
 .../dag/api/client/TestDAGClientHandler.java    |   4 +-
 .../apache/tez/dag/app/MockDAGAppMaster.java    |   6 +-
 .../tez/dag/app/TestMockDAGAppMaster.java       |   3 +-
 .../dag/app/TestTaskCommunicatorManager.java    | 136 ++++++++++-
 .../apache/tez/dag/app/dag/impl/TestCommit.java |  87 +++++--
 .../tez/dag/app/dag/impl/TestDAGImpl.java       |  84 +++++--
 .../tez/dag/app/dag/impl/TestVertexImpl.java    |  13 +-
 .../launcher/TestContainerLauncherManager.java  | 101 +++++++-
 .../tez/dag/app/rm/TestTaskScheduler.java       |  19 +-
 .../dag/app/rm/TestTaskSchedulerHelpers.java    |  15 +-
 .../dag/app/rm/TestTaskSchedulerManager.java    | 161 ++++++++++++
 .../tez/dag/helpers/DagInfoImplForTest.java     |  38 +++
 .../tez/dag/app/ErrorPluginConfiguration.java   | 134 ++++++++++
 ...zTestServiceContainerLauncherWithErrors.java |  17 +-
 ...stServiceTaskSchedulerServiceWithErrors.java |  23 +-
 ...ezTestServiceTaskCommunicatorWithErrors.java |  22 +-
 .../tests/TestExternalTezServicesErrors.java    | 243 +++++++++++++++----
 46 files changed, 1584 insertions(+), 247 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e2f77f6..af643dd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,7 @@ Apache Tez Change Log
 Release 0.8.3: Unreleased
 
 INCOMPATIBLE CHANGES
+  TEZ-3029. Add an onError method to service plugin contexts.
 
 ALL CHANGES:
   TEZ-3103. Shuffle can hang when memory to memory merging enabled

http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java
index 70a3498..ed1d58f 100644
--- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java
@@ -14,15 +14,15 @@
 
 package org.apache.tez.serviceplugins.api;
 
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.tez.dag.api.UserPayload;
 
 @InterfaceAudience.Public
 @InterfaceStability.Unstable
-public interface ContainerLauncherContext {
+public interface ContainerLauncherContext extends ServicePluginContextBase {
 
   // TODO TEZ-2003 (post) TEZ-2664 Tez abstraction for ContainerId, NodeId, other YARN constructs
 
@@ -77,13 +77,6 @@ public interface ContainerLauncherContext {
   // Lookup APIs
 
   /**
-   * Get the UserPayload that was configured while setting up the launcher
-   *
-   * @return the initially configured user payload
-   */
-  UserPayload getInitialUserPayload();
-
-  /**
    * Get the number of nodes being handled by the specified source
    *
    * @param sourceName the relevant source name
@@ -108,4 +101,5 @@ public interface ContainerLauncherContext {
    *
    */
   Object getTaskCommunicatorMetaInfo(String taskCommName);
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-api/src/main/java/org/apache/tez/serviceplugins/api/DagInfo.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/DagInfo.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/DagInfo.java
new file mode 100644
index 0000000..ef73343
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/DagInfo.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.serviceplugins.api;
+
+public interface DagInfo {
+
+  /**
+   * The index of the current dag
+   * @return a numerical identifier for the DAG. This is unique within the currently running application.
+   */
+  int getIndex();
+
+  /**
+   * Get the name of the dag
+   * @return the name of the dag
+   */
+  String getName();
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginContextBase.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginContextBase.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginContextBase.java
new file mode 100644
index 0000000..90a51b2
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginContextBase.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.serviceplugins.api;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import org.apache.tez.dag.api.UserPayload;
+
+/**
+ * Base interface for ServicePluginContexts
+ */
+public interface ServicePluginContextBase {
+
+  /**
+   * Get the UserPayload that was configured while setting up the launcher
+   *
+   * @return the initially configured user payload
+   */
+  UserPayload getInitialUserPayload();
+
+  /**
+   * Get information on the currently executing dag
+   * @return info on the currently running dag, or null if no dag is executing
+   */
+  @Nullable
+  DagInfo getCurrentDagInfo();
+
+  /**
+   * Report an error from the service. This results in the specific DAG being killed.
+   *
+   * @param servicePluginError the error category
+   * @param message      A diagnostic message associated with this error
+   * @param dagInfo      the affected dag
+   */
+  void reportError(@Nonnull ServicePluginError servicePluginError, String message, DagInfo dagInfo);
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginError.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginError.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginError.java
new file mode 100644
index 0000000..932c0fa
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginError.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.serviceplugins.api;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+/**
+ * Represents errors from a ServicePlugin. The default implementation {@link ServicePluginErrorDefaults}
+ * lists a basic set of errors.
+ * This can be extended by implementing this interface, if the default set is not adequate
+ */
+public interface ServicePluginError {
+
+  enum ErrorType {
+    TEMPORARY, PERMANENT,
+  }
+
+  /**
+   * Get the enum representation
+   *
+   * @return an enum representation of the ServicePluginError
+   */
+  Enum getEnum();
+
+  /**
+   * The type of the error
+   *
+   * @return the type of the error
+   */
+  ErrorType getErrorType();
+
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginErrorDefaults.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginErrorDefaults.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginErrorDefaults.java
new file mode 100644
index 0000000..83a85b5
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginErrorDefaults.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.serviceplugins.api;/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * A default set of errors from ServicePlugins
+ *
+ * Errors are marked as fatal or non-fatal for the Application.
+ * Fatal errors cause the AM to go down.
+ *
+ */
+@InterfaceAudience.Public
+public enum ServicePluginErrorDefaults implements ServicePluginError {
+  /**
+   * Indicates that the service is currently unavailable.
+   * This is a temporary error.
+   */
+  SERVICE_UNAVAILABLE(ErrorType.TEMPORARY),
+
+  /** Indicates that the service is in an inconsistent state.
+   * This is a fatal error.
+   */
+  INCONSISTENT_STATE(ErrorType.PERMANENT),
+
+  /**
+   * Other temporary error,
+   */
+  OTHER(ErrorType.TEMPORARY),
+
+  /**
+   * Other fatal error.
+   */
+  OTHER_FATAL(ErrorType.PERMANENT);
+
+  private ErrorType errorType;
+
+  ServicePluginErrorDefaults(ErrorType errorType) {
+    this.errorType = errorType;
+  }
+
+  @Override
+  public Enum getEnum() {
+    return this;
+  }
+
+  @Override
+  public ErrorType getErrorType() {
+    return errorType;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java
index a24061f..d30ada3 100644
--- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.tez.common.ContainerSignatureMatcher;
-import org.apache.tez.dag.api.UserPayload;
 
 /**
  * Context for a {@link TaskScheduler}
@@ -42,7 +41,7 @@ import org.apache.tez.dag.api.UserPayload;
  */
 @InterfaceAudience.Public
 @InterfaceStability.Unstable
-public interface TaskSchedulerContext {
+public interface TaskSchedulerContext extends ServicePluginContextBase {
 
   class AppFinalStatus {
     public final FinalApplicationStatus exitStatus;
@@ -136,14 +135,6 @@ public interface TaskSchedulerContext {
   );
 
   /**
-   * Indicate to the framework that the scheduler has run into an error. This will cause
-   * the DAG and application to be killed.
-   *
-   * @param t the relevant error
-   */
-  void onError(Throwable t);
-
-  /**
    * Inform the framework that the scheduler has determined that a previously allocated container
    * needs to be preempted
    *
@@ -164,13 +155,6 @@ public interface TaskSchedulerContext {
   // Getters
 
   /**
-   * Get the UserPayload that was configured while setting up the scheduler
-   *
-   * @return the initially configured user payload
-   */
-  UserPayload getInitialUserPayload();
-
-  /**
    * Get the tracking URL for the application. Primarily relevant to YARN
    *
    * @return the trackingUrl for the app
@@ -234,4 +218,5 @@ public interface TaskSchedulerContext {
    * @return the app master state
    */
   AMState getAMState();
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/Utils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/Utils.java b/tez-dag/src/main/java/org/apache/tez/Utils.java
index 959b536..6f03a67 100644
--- a/tez-dag/src/main/java/org/apache/tez/Utils.java
+++ b/tez-dag/src/main/java/org/apache/tez/Utils.java
@@ -15,7 +15,14 @@
 package org.apache.tez;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.yarn.event.Event;
 import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.DAGTerminationCause;
+import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.serviceplugins.api.DagInfo;
+import org.apache.tez.serviceplugins.api.ServicePluginError;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -63,4 +70,30 @@ public class Utils {
     return "[" + schedulerIndex + ":" + name + "]";
   }
 
+  public static void processNonFatalServiceErrorReport(String entityString,
+                                                       ServicePluginError servicePluginError,
+                                                       String diagnostics,
+                                                       DagInfo dagInfo, AppContext appContext,
+                                                       String componentName) {
+    String message = "Error reported by " + componentName + " [" +
+        entityString + "][" +
+        servicePluginError +
+        "] " + (diagnostics == null ? "" : diagnostics);
+    if (dagInfo != null) {
+      DAG dag = appContext.getCurrentDAG();
+      if (dag != null && dag.getID().getId() == dagInfo.getIndex()) {
+        TezDAGID dagId = dag.getID();
+        // Send a kill message only if it is the same dag.
+        LOG.warn(message + ", Failing dag: [" + dagInfo.getName() + ", " + dagId + "]");
+        sendEvent(appContext, new DAGEventTerminateDag(dagId, DAGTerminationCause.SERVICE_PLUGIN_ERROR, message));
+      }
+    } else {
+      LOG.warn("No current dag name provided. Not acting on " + message);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private static void sendEvent(AppContext appContext, Event<?> event) {
+    appContext.getEventHandler().handle(event);
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
index 0f674f3..79b9acd 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
@@ -112,7 +112,7 @@ public class DAGClientHandler {
   public void tryKillDAG(String dagIdStr) throws TezException {
     DAG dag = getDAG(dagIdStr);
     LOG.info("Sending client kill to dag: " + dagIdStr);
-    dagAppMaster.tryKillDAG(dag);
+    dagAppMaster.tryKillDAG(dag, "Kill Dag request received from client");
   }
 
   public synchronized String submitDAG(DAGPlan dagPlan,
@@ -120,10 +120,11 @@ public class DAGClientHandler {
     return dagAppMaster.submitDAGToAppMaster(dagPlan, additionalAmResources);
   }
 
+  // Only to be invoked by the DAGClient.
   public synchronized void shutdownAM() throws TezException {
     LOG.info("Received message to shutdown AM");
     if (dagAppMaster != null) {
-      dagAppMaster.shutdownTezAM();
+      dagAppMaster.shutdownTezAM("AM Shutdown request received from client");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java
index 9434256..7e68675 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java
@@ -14,6 +14,8 @@
 
 package org.apache.tez.dag.app;
 
+import javax.annotation.Nullable;
+
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -21,7 +23,10 @@ import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError;
+import org.apache.tez.dag.app.launcher.ContainerLauncherManager;
 import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
+import org.apache.tez.serviceplugins.api.DagInfo;
+import org.apache.tez.serviceplugins.api.ServicePluginError;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.apache.tez.dag.app.rm.container.AMContainerEvent;
 import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted;
@@ -39,15 +44,22 @@ public class ContainerLauncherContextImpl implements ContainerLauncherContext {
 
   private static final Logger LOG = LoggerFactory.getLogger(ContainerLauncherContextImpl.class);
   private final AppContext context;
+  private final ContainerLauncherManager containerLauncherManager;
   private final TaskCommunicatorManagerInterface tal;
   private final UserPayload initialUserPayload;
+  private final int containerLauncherIndex;
 
-  public ContainerLauncherContextImpl(AppContext appContext, TaskCommunicatorManagerInterface tal, UserPayload initialUserPayload) {
+  public ContainerLauncherContextImpl(AppContext appContext, ContainerLauncherManager containerLauncherManager,
+                                      TaskCommunicatorManagerInterface tal,
+                                      UserPayload initialUserPayload, int containerLauncherIndex) {
     Preconditions.checkNotNull(appContext, "AppContext cannot be null");
+    Preconditions.checkNotNull(appContext, "ContainerLauncherManager cannot be null");
     Preconditions.checkNotNull(tal, "TaskCommunicator cannot be null");
     this.context = appContext;
+    this.containerLauncherManager = containerLauncherManager;
     this.tal = tal;
     this.initialUserPayload = initialUserPayload;
+    this.containerLauncherIndex = containerLauncherIndex;
   }
 
   @Override
@@ -103,6 +115,12 @@ public class ContainerLauncherContextImpl implements ContainerLauncherContext {
     return context.getApplicationAttemptId();
   }
 
+  @Nullable
+  @Override
+  public DagInfo getCurrentDagInfo() {
+    return context.getCurrentDAG();
+  }
+
   @Override
   public Object getTaskCommunicatorMetaInfo(String taskCommName) {
     int taskCommId = context.getTaskCommunicatorIdentifier(taskCommName);
@@ -120,4 +138,11 @@ public class ContainerLauncherContextImpl implements ContainerLauncherContext {
     return null;
   }
 
+  @Override
+  public void reportError(ServicePluginError servicePluginError, String message, DagInfo dagInfo) {
+    Preconditions.checkNotNull(servicePluginError, "ServiceError must be specified");
+    containerLauncherManager.reportError(containerLauncherIndex, servicePluginError, message, dagInfo);
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 579d23f..5ac3800 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -72,9 +72,11 @@ import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
 import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
 import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto;
+import org.apache.tez.dag.app.dag.DAGTerminationCause;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDagCleanup;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError;
 import org.apache.tez.dag.app.dag.event.DAGEventInternalError;
+import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag;
 import org.apache.tez.dag.history.events.DAGRecoveredEvent;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
@@ -711,8 +713,8 @@ public class DAGAppMaster extends AbstractService {
       state = DAGAppMasterState.ERROR;
       errDiagnostics = "Error in the TaskScheduler. Shutting down. ";
       addDiagnostic(errDiagnostics
-          + "Error=" + ExceptionUtils.getStackTrace(schedulingServiceErrorEvent.getThrowable()));
-      LOG.error(errDiagnostics, schedulingServiceErrorEvent.getThrowable());
+          + "Error=" + schedulingServiceErrorEvent.getDiagnosticInfo());
+      LOG.error(errDiagnostics);
       shutdownHandler.shutdown();
       break;
     case TASK_COMMUNICATOR_SERVICE_FATAL_ERROR:
@@ -724,7 +726,7 @@ public class DAGAppMaster extends AbstractService {
       Throwable error = usfe.getError();
       errDiagnostics = "Service Error: " + usfe.getDiagnosticInfo()
           + ", eventType=" + event.getType()
-          + ", exception=" + ExceptionUtils.getStackTrace(usfe.getError());
+          + ", exception=" + (usfe.getError() == null ? "None" : ExceptionUtils.getStackTrace(usfe.getError()));
       LOG.error(errDiagnostics, error);
       addDiagnostic(errDiagnostics);
 
@@ -1291,16 +1293,16 @@ public class DAGAppMaster extends AbstractService {
         + oldState + " new state: " + state);
   }
 
-  public void shutdownTezAM() throws TezException {
+  public void shutdownTezAM(String dagKillmessage) throws TezException {
     sessionStopped.set(true);
     synchronized (this) {
       this.taskSchedulerManager.setShouldUnregisterFlag();
       if (currentDAG != null
           && !currentDAG.isComplete()) {
-        //send a DAG_KILL message
+        //send a DAG_TERMINATE message
         LOG.info("Sending a kill event to the current DAG"
             + ", dagId=" + currentDAG.getID());
-        tryKillDAG(currentDAG);
+        tryKillDAG(currentDAG, dagKillmessage);
       } else {
         LOG.info("No current running DAG, shutting down the AM");
         if (isSession && !state.equals(DAGAppMasterState.ERROR)) {
@@ -1376,13 +1378,13 @@ public class DAGAppMaster extends AbstractService {
   }
 
   @SuppressWarnings("unchecked")
-  public void tryKillDAG(DAG dag) throws TezException {
+  public void tryKillDAG(DAG dag, String message) throws TezException {
     try {
       logDAGKillRequestEvent(dag.getID(), false);
     } catch (IOException e) {
       throw new TezException(e);
     }
-    dispatcher.getEventHandler().handle(new DAGEvent(dag.getID(), DAGEventType.DAG_KILL));
+    dispatcher.getEventHandler().handle(new DAGEventTerminateDag(dag.getID(), DAGTerminationCause.DAG_KILL, message));
   }
   
   private Map<String, LocalResource> getAdditionalLocalResourceDiff(
@@ -2235,10 +2237,10 @@ public class DAGAppMaster extends AbstractService {
     if (currentTime < (lastDAGCompletionTime + sessionTimeoutInterval)) {
       return;
     }
-    LOG.info("Session timed out"
+    String message = "Session timed out"
         + ", lastDAGCompletionTime=" + lastDAGCompletionTime + " ms"
-        + ", sessionTimeoutInterval=" + sessionTimeoutInterval + " ms");
-    shutdownTezAM();
+        + ", sessionTimeoutInterval=" + sessionTimeoutInterval + " ms";
+    shutdownTezAM(message);
   }
 
   public boolean isSession() {

http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
index 7f88be2..a922f38 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
@@ -14,6 +14,7 @@
 
 package org.apache.tez.dag.app;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.Set;
@@ -28,6 +29,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.app.rm.container.AMContainer;
+import org.apache.tez.serviceplugins.api.DagInfo;
+import org.apache.tez.serviceplugins.api.ServicePluginError;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.apache.tez.serviceplugins.api.TaskCommunicatorContext;
 import org.apache.tez.serviceplugins.api.TaskHeartbeatRequest;
@@ -143,6 +146,7 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver
         this);
   }
 
+  @SuppressWarnings("deprecation")
   @Override
   public String getCurrentDagName() {
     return getDag().getName();
@@ -153,11 +157,18 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver
     return context.getApplicationID().toString();
   }
 
+  @SuppressWarnings("deprecation")
   @Override
   public int getCurrentDagIdenitifer() {
     return getDag().getID().getId();
   }
 
+  @Nullable
+  @Override
+  public DagInfo getCurrentDagInfo() {
+    return getDag();
+  }
+
   @Override
   public Iterable<String> getInputVertexNames(String vertexName) {
     Preconditions.checkNotNull(vertexName, "VertexName cannot be null: " + vertexName);
@@ -203,6 +214,12 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver
   }
 
   @Override
+  public void reportError(@Nonnull ServicePluginError servicePluginError, String message, DagInfo dagInfo) {
+    Preconditions.checkNotNull(servicePluginError, "ServicePluginError must be set");
+    taskCommunicatorManager.reportError(taskCommunicatorIndex, servicePluginError, message, dagInfo);
+  }
+
+  @Override
   public void onStateUpdated(VertexStateUpdate event) {
     taskCommunicatorManager.vertexStateUpdateNotificationReceived(event, taskCommunicatorIndex);
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
index a196114..403e1a1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
@@ -33,6 +33,8 @@ import org.apache.commons.collections4.ListUtils;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.tez.Utils;
 import org.apache.tez.dag.api.NamedEntityDescriptor;
+import org.apache.tez.serviceplugins.api.DagInfo;
+import org.apache.tez.serviceplugins.api.ServicePluginError;
 import org.apache.tez.serviceplugins.api.TaskCommunicator;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.UserPayload;
@@ -593,6 +595,30 @@ public class TaskCommunicatorManager extends AbstractService implements
     return taskCommunicators[taskCommIndex];
   }
 
+  @Override
+  public void reportError(int taskCommIndex, ServicePluginError servicePluginError,
+                          String diagnostics,
+                          DagInfo dagInfo) {
+    if (servicePluginError.getErrorType() == ServicePluginError.ErrorType.PERMANENT) {
+      String msg = "Fatal Error reported by TaskCommunicator"
+          + ", communicator=" + Utils.getTaskCommIdentifierString(taskCommIndex, context)
+          + ", servicePluginError=" + servicePluginError
+          + ", diagnostics= " + (diagnostics == null ? "" : diagnostics);
+      LOG.error(msg + ", Diagnostics=" + diagnostics);
+      sendEvent(
+          new DAGAppMasterEventUserServiceFatalError(
+              DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR,
+              msg, null));
+    } else {
+      Utils
+          .processNonFatalServiceErrorReport(
+              Utils.getTaskCommIdentifierString(taskCommIndex, context), servicePluginError,
+              diagnostics,
+              dagInfo, context,
+              "TaskCommunicator");
+    }
+  }
+
   private void pingContainerHeartbeatHandler(ContainerId containerId) {
     containerHeartbeatHandler.pinged(containerId);
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java
index e07b1a0..e0f9852 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java
@@ -20,6 +20,8 @@ package org.apache.tez.dag.app;
 
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.tez.serviceplugins.api.ContainerEndReason;
+import org.apache.tez.serviceplugins.api.DagInfo;
+import org.apache.tez.serviceplugins.api.ServicePluginError;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.rm.container.AMContainerTask;
@@ -42,4 +44,6 @@ public interface TaskCommunicatorManagerInterface {
   void dagSubmitted();
 
   TaskCommunicatorWrapper getTaskCommunicator(int taskCommIndex);
+
+  void reportError(int taskCommIndex, ServicePluginError servicePluginError, String diagnostics, DagInfo dagName);
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
index a01c623..dd96ab2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
@@ -36,11 +36,12 @@ import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.common.security.ACLManager;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.serviceplugins.api.DagInfo;
 
 /**
  * Main interface to interact with the job.
  */
-public interface DAG {
+public interface DAG extends DagInfo {
 
   TezDAGID getID();
   Map<String, LocalResource> getLocalResources();

http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
index b6be395..b73cbe6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
@@ -26,6 +26,9 @@ public enum DAGTerminationCause {
 
   /** DAG was directly killed.   */
   DAG_KILL(DAGState.KILLED),
+
+  /** A service plugin indicated an error */
+  SERVICE_PLUGIN_ERROR(DAGState.FAILED),
   
   /** A vertex failed. */ 
   VERTEX_FAILURE(DAGState.FAILED),

http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
index 816f85a..49be74d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
@@ -24,7 +24,7 @@ package org.apache.tez.dag.app.dag;
 public enum VertexTerminationCause {
 
   /** DAG was killed  */
-  DAG_KILL(VertexState.KILLED),
+  DAG_TERMINATED(VertexState.KILLED),
 
   /** Other vertex failed causing DAG to fail thus killing this vertex  */
   OTHER_VERTEX_FAILURE(VertexState.KILLED),

http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventSchedulingServiceError.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventSchedulingServiceError.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventSchedulingServiceError.java
index 16625df..cf49d20 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventSchedulingServiceError.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventSchedulingServiceError.java
@@ -18,17 +18,18 @@
 
 package org.apache.tez.dag.app.dag.event;
 
-public class DAGAppMasterEventSchedulingServiceError extends DAGAppMasterEvent {
+public class DAGAppMasterEventSchedulingServiceError extends DAGAppMasterEvent
+    implements DiagnosableEvent {
 
-  private final Throwable throwable;
+  private final String diagnostics;
 
-  public DAGAppMasterEventSchedulingServiceError(Throwable t) {
+  public DAGAppMasterEventSchedulingServiceError(String diagnostics) {
     super(DAGAppMasterEventType.SCHEDULING_SERVICE_ERROR);
-    this.throwable = t;
+    this.diagnostics = diagnostics;
   }
 
-  public Throwable getThrowable() {
-    return throwable;
+  @Override
+  public String getDiagnosticInfo() {
+   return diagnostics;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventTerminateDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventTerminateDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventTerminateDag.java
new file mode 100644
index 0000000..1286e11
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventTerminateDag.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.app.dag.DAGTerminationCause;
+import org.apache.tez.dag.records.TezDAGID;
+
+public class DAGEventTerminateDag extends DAGEvent implements DiagnosableEvent {
+  private final String diagMessage;
+  private final DAGTerminationCause terminationCause;
+
+  public DAGEventTerminateDag(TezDAGID dagId, DAGTerminationCause terminationCause, String message) {
+    super(dagId, DAGEventType.DAG_TERMINATE);
+    this.diagMessage = message;
+    this.terminationCause = terminationCause;
+  }
+
+  @Override
+  public String getDiagnosticInfo() {
+    return diagMessage;
+  }
+
+  public DAGTerminationCause getTerminationCause() {
+    return terminationCause;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java
index ea6a3cc..bf3b30a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java
@@ -23,8 +23,8 @@ package org.apache.tez.dag.app.dag.event;
  */
 public enum DAGEventType {
 
-  //Producer:Client
-  DAG_KILL,
+  //Producer: ServicePluginManagers , Client (KILL)
+  DAG_TERMINATE,
 
   //Producer:AM
   DAG_INIT,

http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 88dfe27..a6c6c02 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -43,7 +43,7 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.common.counters.LimitExceededException;
-import org.apache.tez.dag.app.dag.event.DAGEventInternalError;
+import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag;
 import org.apache.tez.dag.app.dag.event.DiagnosableEvent;
 import org.apache.tez.state.OnStateChangedCallback;
 import org.apache.tez.state.StateMachineTez;
@@ -253,8 +253,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
               EnumSet.of(DAGState.INITED, DAGState.FAILED),
               DAGEventType.DAG_INIT,
               new InitTransition())
-          .addTransition(DAGState.NEW, DAGState.KILLED,
-              DAGEventType.DAG_KILL,
+          .addTransition(DAGState.NEW, EnumSet.of(DAGState.KILLED, DAGState.FAILED),
+              DAGEventType.DAG_TERMINATE,
               new KillNewJobTransition())
           .addTransition(DAGState.NEW, DAGState.ERROR,
               DAGEventType.INTERNAL_ERROR,
@@ -269,8 +269,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
           .addTransition(DAGState.INITED, DAGState.RUNNING,
               DAGEventType.DAG_START,
               new StartTransition())
-          .addTransition(DAGState.INITED, DAGState.KILLED,
-              DAGEventType.DAG_KILL,
+          .addTransition(DAGState.INITED, EnumSet.of(DAGState.KILLED, DAGState.FAILED),
+              DAGEventType.DAG_TERMINATE,
               new KillInitedJobTransition())
           .addTransition(DAGState.INITED, DAGState.ERROR,
               DAGEventType.INTERNAL_ERROR,
@@ -287,7 +287,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
               DAGEventType.DAG_VERTEX_RERUNNING,
               new VertexReRunningTransition())
           .addTransition(DAGState.RUNNING, DAGState.TERMINATING,
-              DAGEventType.DAG_KILL, new DAGKilledTransition())
+              DAGEventType.DAG_TERMINATE, new DAGKilledTransition())
           .addTransition(DAGState.RUNNING, DAGState.RUNNING,
               DAGEventType.DAG_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
@@ -311,7 +311,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
               DAGEventType.DAG_COMMIT_COMPLETED,
               COMMIT_COMPLETED_TRANSITION)
           .addTransition(DAGState.COMMITTING, DAGState.TERMINATING, 
-              DAGEventType.DAG_KILL,
+              DAGEventType.DAG_TERMINATE,
               new DAGKilledWhileCommittingTransition())
           .addTransition(
               DAGState.COMMITTING,
@@ -354,7 +354,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
 
               // Ignore-able events
           .addTransition(DAGState.TERMINATING, DAGState.TERMINATING,
-              EnumSet.of(DAGEventType.DAG_KILL,
+              EnumSet.of(DAGEventType.DAG_TERMINATE,
                          DAGEventType.DAG_VERTEX_RERUNNING,
                          DAGEventType.DAG_SCHEDULER_UPDATE))
 
@@ -370,7 +370,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
               INTERNAL_ERROR_TRANSITION)
           // Ignore-able events
           .addTransition(DAGState.SUCCEEDED, DAGState.SUCCEEDED,
-              EnumSet.of(DAGEventType.DAG_KILL,
+              EnumSet.of(DAGEventType.DAG_TERMINATE,
                   DAGEventType.DAG_SCHEDULER_UPDATE,
                   DAGEventType.DAG_VERTEX_COMPLETED))
 
@@ -386,7 +386,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
               INTERNAL_ERROR_TRANSITION)
           // Ignore-able events
           .addTransition(DAGState.FAILED, DAGState.FAILED,
-              EnumSet.of(DAGEventType.DAG_KILL,
+              EnumSet.of(DAGEventType.DAG_TERMINATE,
                   DAGEventType.DAG_START,
                   DAGEventType.DAG_VERTEX_RERUNNING,
                   DAGEventType.DAG_SCHEDULER_UPDATE,
@@ -404,7 +404,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
               INTERNAL_ERROR_TRANSITION)
           // Ignore-able events
           .addTransition(DAGState.KILLED, DAGState.KILLED,
-              EnumSet.of(DAGEventType.DAG_KILL,
+              EnumSet.of(DAGEventType.DAG_TERMINATE,
                   DAGEventType.DAG_START,
                   DAGEventType.DAG_VERTEX_RERUNNING,
                   DAGEventType.DAG_SCHEDULER_UPDATE,
@@ -415,7 +415,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
               DAGState.ERROR,
               DAGState.ERROR,
               EnumSet.of(
-                  DAGEventType.DAG_KILL,
+                  DAGEventType.DAG_TERMINATE,
                   DAGEventType.DAG_INIT,
                   DAGEventType.DAG_START,
                   DAGEventType.DAG_VERTEX_COMPLETED,
@@ -1424,6 +1424,11 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   }
 
   @Override
+  public int getIndex() {
+    return dagId.getId();
+  }
+
+  @Override
   public String getName() {
     return dagName;
   }
@@ -1836,28 +1841,41 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     }
   }
 
+  private void addDiagnostics(DiagnosableEvent event) {
+    if (event.getDiagnosticInfo() != null && !event.getDiagnosticInfo().isEmpty()) {
+      addDiagnostic(event.getDiagnosticInfo());
+    }
+  }
+
   // Task-start has been moved out of InitTransition, so this arc simply
   // hardcodes 0 for both map and reduce finished tasks.
-  private static class KillNewJobTransition
-      implements SingleArcTransition<DAGImpl, DAGEvent> {
+  private static class KillNewJobTransition implements
+      MultipleArcTransition<DAGImpl, DAGEvent, DAGState> {
 
     @Override
-    public void transition(DAGImpl dag, DAGEvent dagEvent) {
+    public DAGState transition(DAGImpl dag, DAGEvent dagEvent) {
+      DAGEventTerminateDag event = (DAGEventTerminateDag) dagEvent;
       dag.setFinishTime();
-      dag.trySetTerminationCause(DAGTerminationCause.DAG_KILL);
-      dag.finished(DAGState.KILLED);
+      dag.trySetTerminationCause(event.getTerminationCause());
+      dag.addDiagnostic("Dag received [" + event.getType() + ", " + event.getTerminationCause() +
+          "] in NEW state.");
+      dag.addDiagnostics(event);
+      return dag.finished(event.getTerminationCause().getFinishedState());
     }
 
   }
 
-  private static class KillInitedJobTransition
-      implements SingleArcTransition<DAGImpl, DAGEvent> {
+  private static class KillInitedJobTransition implements
+      MultipleArcTransition<DAGImpl, DAGEvent, DAGState> {
 
     @Override
-    public void transition(DAGImpl dag, DAGEvent dagEvent) {
-      dag.trySetTerminationCause(DAGTerminationCause.DAG_KILL);
-      dag.addDiagnostic("Job received Kill in INITED state.");
-      dag.finished(DAGState.KILLED);
+    public DAGState transition(DAGImpl dag, DAGEvent dagEvent) {
+      DAGEventTerminateDag event = (DAGEventTerminateDag) dagEvent;
+      dag.trySetTerminationCause(event.getTerminationCause());
+      dag.addDiagnostic("Dag received [" + event.getType() + ", " + event.getTerminationCause() +
+          "] in INITED state.");
+      dag.addDiagnostics(event);
+      return dag.finished(event.getTerminationCause().getFinishedState());
     }
 
   }
@@ -1865,11 +1883,14 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   private static class DAGKilledTransition
       implements SingleArcTransition<DAGImpl, DAGEvent> {
     @Override
-    public void transition(DAGImpl job, DAGEvent event) {
-      String msg = "Job received Kill while in RUNNING state.";
+    public void transition(DAGImpl job, DAGEvent dagEvent) {
+      DAGEventTerminateDag event = (DAGEventTerminateDag) dagEvent;
+      String msg = "Dag received [" + event.getType() + ", " + event.getTerminationCause() +
+          "] in RUNNING state.";
       LOG.info(msg);
       job.addDiagnostic(msg);
-      job.enactKill(DAGTerminationCause.DAG_KILL, VertexTerminationCause.DAG_KILL);
+      job.addDiagnostics(event);
+      job.enactKill(event.getTerminationCause(), VertexTerminationCause.DAG_TERMINATED);
       // Commit may happen when dag is still in RUNNING (vertex group commit)
       job.cancelCommits();
       // TODO Metrics
@@ -1883,12 +1904,15 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     implements SingleArcTransition<DAGImpl, DAGEvent> {
 
     @Override
-    public void transition(DAGImpl dag, DAGEvent event) {
-      String diag = "DAG received Kill while in COMMITTING state.";
+    public void transition(DAGImpl dag, DAGEvent dagEvent) {
+      DAGEventTerminateDag event = (DAGEventTerminateDag) dagEvent;
+      String diag = "Dag received [" + event.getType() + ", " + event.getTerminationCause() +
+          "] in COMMITTING state.";
       LOG.info(diag);
       dag.addDiagnostic(diag);
+      dag.addDiagnostics(event);
       dag.cancelCommits();
-      dag.trySetTerminationCause(DAGTerminationCause.DAG_KILL);
+      dag.trySetTerminationCause(event.getTerminationCause());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 065974e..c8f217b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -3206,7 +3206,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
       VertexEventTermination vet = (VertexEventTermination) event;
       VertexTerminationCause trigger = vet.getTerminationCause();
       switch(trigger){
-        case DAG_KILL : vertex.tryEnactKill(trigger, TaskTerminationCause.DAG_KILL); break;
+        case DAG_TERMINATED: vertex.tryEnactKill(trigger, TaskTerminationCause.DAG_KILL); break;
         case OWN_TASK_FAILURE: vertex.tryEnactKill(trigger, TaskTerminationCause.OTHER_TASK_FAILURE); break;
         case ROOT_INPUT_INIT_FAILURE:
         case COMMIT_FAILURE:

http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
index 98237c1..250afd8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
@@ -44,6 +44,8 @@ import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.rm.ContainerLauncherEvent;
 import org.apache.tez.dag.app.rm.ContainerLauncherLaunchRequestEvent;
+import org.apache.tez.serviceplugins.api.DagInfo;
+import org.apache.tez.serviceplugins.api.ServicePluginError;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,16 +60,9 @@ public class ContainerLauncherManager extends AbstractService
   final ContainerLauncherContext containerLauncherContexts[];
   protected final ServicePluginLifecycleAbstractService[] containerLauncherServiceWrappers;
   private final AppContext appContext;
+  private final boolean isIncompleteCtor;
+
 
-  @VisibleForTesting
-  public ContainerLauncherManager(ContainerLauncher containerLauncher, AppContext context) {
-    super(ContainerLauncherManager.class.getName());
-    this.appContext = context;
-    containerLaunchers = new ContainerLauncherWrapper[] {new ContainerLauncherWrapper(containerLauncher)};
-    containerLauncherContexts = new ContainerLauncherContext[] {containerLauncher.getContext()};
-    containerLauncherServiceWrappers = new ServicePluginLifecycleAbstractService[]{
-        new ServicePluginLifecycleAbstractService<>(containerLauncher)};
-  }
 
   // Accepting conf to setup final parameters, if required.
   public ContainerLauncherManager(AppContext context,
@@ -77,6 +72,7 @@ public class ContainerLauncherManager extends AbstractService
                                   boolean isPureLocalMode) throws TezException {
     super(ContainerLauncherManager.class.getName());
 
+    this.isIncompleteCtor = false;
     this.appContext = context;
     Preconditions.checkArgument(
         containerLauncherDescriptors != null && !containerLauncherDescriptors.isEmpty(),
@@ -89,7 +85,7 @@ public class ContainerLauncherManager extends AbstractService
     for (int i = 0; i < containerLauncherDescriptors.size(); i++) {
       UserPayload userPayload = containerLauncherDescriptors.get(i).getUserPayload();
       ContainerLauncherContext containerLauncherContext =
-          new ContainerLauncherContextImpl(context, taskCommunicatorManagerInterface, userPayload);
+          new ContainerLauncherContextImpl(context, this, taskCommunicatorManagerInterface, userPayload, i);
       containerLauncherContexts[i] = containerLauncherContext;
       containerLaunchers[i] = new ContainerLauncherWrapper(createContainerLauncher(containerLauncherDescriptors.get(i), context,
           containerLauncherContext, taskCommunicatorManagerInterface, workingDirectory, i, isPureLocalMode));
@@ -98,6 +94,25 @@ public class ContainerLauncherManager extends AbstractService
   }
 
   @VisibleForTesting
+  public ContainerLauncherManager(AppContext context) {
+    super(ContainerLauncherManager.class.getName());
+    this.isIncompleteCtor = true;
+    this.appContext = context;
+    containerLaunchers = new ContainerLauncherWrapper[1];
+    containerLauncherContexts = new ContainerLauncherContext[1];
+    containerLauncherServiceWrappers = new ServicePluginLifecycleAbstractService[1];
+  }
+
+  // To be used with the constructor which accepts the AppContext only, and is for testing.
+  @VisibleForTesting
+  public void setContainerLauncher(ContainerLauncher containerLauncher) {
+    Preconditions.checkState(isIncompleteCtor == true, "Can only be used with the Test constructor");
+    containerLaunchers[0] = new ContainerLauncherWrapper(containerLauncher);
+    containerLauncherContexts[0] = containerLauncher.getContext();
+    containerLauncherServiceWrappers[0] = new ServicePluginLifecycleAbstractService<>(containerLauncher);
+  }
+
+  @VisibleForTesting
   ContainerLauncher createContainerLauncher(
       NamedEntityDescriptor containerLauncherDescriptor,
       AppContext context,
@@ -236,6 +251,30 @@ public class ContainerLauncherManager extends AbstractService
     }
   }
 
+  public void reportError(int containerLauncherIndex, ServicePluginError servicePluginError,
+                          String diagnostics,
+                          DagInfo dagInfo) {
+    if (servicePluginError.getErrorType() == ServicePluginError.ErrorType.PERMANENT) {
+      String msg = "Fatal Error reported by ContainerLauncher"
+          + ", containerLauncher=" +
+          Utils.getContainerLauncherIdentifierString(containerLauncherIndex, appContext)
+          + ", servicePluginError=" + servicePluginError
+          + ", diagnostics= " + (diagnostics == null ? "" : diagnostics);
+      LOG.error(msg);
+      sendEvent(
+          new DAGAppMasterEventUserServiceFatalError(
+              DAGAppMasterEventType.CONTAINER_LAUNCHER_SERVICE_FATAL_ERROR,
+              msg, null));
+    } else {
+      Utils
+          .processNonFatalServiceErrorReport(
+              Utils.getContainerLauncherIdentifierString(containerLauncherIndex, appContext),
+              servicePluginError,
+              diagnostics, dagInfo,
+              appContext, "ContainerLauncher");
+    }
+  }
+
   @SuppressWarnings("unchecked")
   private void sendEvent(Event<?> event) {
     appContext.getEventHandler().handle(event);

http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java
index 37aa96b..fb4198b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java
@@ -14,10 +14,12 @@
 
 package org.apache.tez.dag.app.rm;
 
+import javax.annotation.Nullable;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -29,6 +31,8 @@ import org.apache.tez.common.ContainerSignatureMatcher;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.serviceplugins.api.DagInfo;
+import org.apache.tez.serviceplugins.api.ServicePluginError;
 import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
 
 public class TaskSchedulerContextImpl implements TaskSchedulerContext {
@@ -94,11 +98,6 @@ public class TaskSchedulerContextImpl implements TaskSchedulerContext {
   }
 
   @Override
-  public void onError(Throwable t) {
-    taskSchedulerManager.onError(schedulerId, t);
-  }
-
-  @Override
   public float getProgress() {
     return taskSchedulerManager.getProgress(schedulerId);
   }
@@ -139,6 +138,12 @@ public class TaskSchedulerContextImpl implements TaskSchedulerContext {
     return appContext.getApplicationAttemptId();
   }
 
+  @Nullable
+  @Override
+  public DagInfo getCurrentDagInfo() {
+    return appContext.getCurrentDAG();
+  }
+
   @Override
   public String getAppHostName() {
     return appHostName;
@@ -175,4 +180,11 @@ public class TaskSchedulerContextImpl implements TaskSchedulerContext {
         throw new TezUncheckedException("Unexpected state " + appContext.getAMState());
     }
   }
+
+  @Override
+  public void reportError(ServicePluginError servicePluginError, String diagnostics,
+                          DagInfo dagInfo) {
+    Preconditions.checkNotNull(servicePluginError, "ServicePluginError must be specified");
+    taskSchedulerManager.reportError(schedulerId, servicePluginError, diagnostics, dagInfo);
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java
index 9e4c8e0..7e1988b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java
@@ -18,6 +18,8 @@
 
 package org.apache.tez.dag.app.rm;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
@@ -37,6 +39,8 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.tez.common.ContainerSignatureMatcher;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.serviceplugins.api.DagInfo;
+import org.apache.tez.serviceplugins.api.ServicePluginError;
 import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
 
 /**
@@ -97,8 +101,9 @@ class TaskSchedulerContextImplWrapper implements TaskSchedulerContext {
   }
 
   @Override
-  public void onError(Throwable t) {
-    executorService.submit(new OnErrorCallable(real, t));
+  public void reportError(@Nonnull ServicePluginError servicePluginError, String message,
+                          DagInfo dagInfo) {
+    executorService.submit(new ReportErrorCallable(real, servicePluginError, message, dagInfo));
   }
 
   @Override
@@ -156,6 +161,12 @@ class TaskSchedulerContextImplWrapper implements TaskSchedulerContext {
     return real.getApplicationAttemptId();
   }
 
+  @Nullable
+  @Override
+  public DagInfo getCurrentDagInfo() {
+    return real.getCurrentDagInfo();
+  }
+
   @Override
   public String getAppHostName() {
     return real.getAppHostName();
@@ -175,6 +186,7 @@ class TaskSchedulerContextImplWrapper implements TaskSchedulerContext {
   public AMState getAMState() {
     return real.getAMState();
   }
+
   // End of getters which do not need to go through a thread. Underlying implementation
   // does not use locks.
 
@@ -301,19 +313,24 @@ class TaskSchedulerContextImplWrapper implements TaskSchedulerContext {
     }
   }
 
-  static class OnErrorCallable extends TaskSchedulerContextCallbackBase implements
-      Callable<Void> {
+  static class ReportErrorCallable extends TaskSchedulerContextCallbackBase implements Callable<Void> {
 
-    private final Throwable throwable;
+    private final ServicePluginError servicePluginError;
+    private final String message;
+    private final DagInfo dagInfo;
 
-    public OnErrorCallable(TaskSchedulerContext app, Throwable throwable) {
+    public ReportErrorCallable(TaskSchedulerContext app,
+                               ServicePluginError servicePluginError, String message,
+                               DagInfo dagInfo) {
       super(app);
-      this.throwable = throwable;
+      this.servicePluginError = servicePluginError;
+      this.message = message;
+      this.dagInfo = dagInfo;
     }
 
     @Override
     public Void call() throws Exception {
-      app.onError(throwable);
+      app.reportError(servicePluginError, message, dagInfo);
       return null;
     }
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
index fa9fb81..5317440 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
@@ -38,6 +38,8 @@ import org.apache.tez.dag.api.NamedEntityDescriptor;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError;
+import org.apache.tez.serviceplugins.api.DagInfo;
+import org.apache.tez.serviceplugins.api.ServicePluginError;
 import org.apache.tez.serviceplugins.api.TaskScheduler;
 import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
 import org.apache.tez.serviceplugins.api.TaskSchedulerContext.AppFinalStatus;
@@ -844,9 +846,36 @@ public class TaskSchedulerManager extends AbstractService implements
     return dagAppMaster.getProgress();
   }
 
-  public void onError(int schedulerId, Throwable t) {
-    LOG.info("Error reported by scheduler {} - {}", schedulerId, t);
-    sendEvent(new DAGAppMasterEventSchedulingServiceError(t));
+  public void reportError(int taskSchedulerIndex, ServicePluginError servicePluginError,
+                          String diagnostics,
+                          DagInfo dagInfo) {
+    if (servicePluginError == YarnTaskSchedulerServiceError.RESOURCEMANAGER_ERROR) {
+      LOG.info("Error reported by scheduler {} - {}",
+          Utils.getTaskSchedulerIdentifierString(taskSchedulerIndex, appContext) + ": " +
+              diagnostics);
+      if (taskSchedulerDescriptors[taskSchedulerIndex].getClassName()
+          .equals(YarnTaskSchedulerService.class.getName())) {
+        LOG.warn(
+            "Reporting a SchedulerServiceError to the DAGAppMaster since the error" +
+                " was reported by the default YARN Task Scheduler");
+        sendEvent(new DAGAppMasterEventSchedulingServiceError(diagnostics));
+      }
+    } else if (servicePluginError.getErrorType() == ServicePluginError.ErrorType.PERMANENT) {
+      String msg = "Fatal error reported by TaskScheduler"
+          + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(taskSchedulerIndex, appContext)
+          + ", servicePluginError=" + servicePluginError
+          + ", diagnostics= " + (diagnostics == null ? "" : diagnostics);
+      LOG.error(msg);
+      sendEvent(
+          new DAGAppMasterEventUserServiceFatalError(
+              DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR,
+              msg, null));
+    } else {
+      Utils.processNonFatalServiceErrorReport(
+          Utils.getTaskSchedulerIdentifierString(taskSchedulerIndex, appContext),
+          servicePluginError, diagnostics, dagInfo,
+          appContext, "TaskScheduler");
+    }
   }
 
   public void dagCompleted() {
@@ -964,5 +993,4 @@ public class TaskSchedulerManager extends AbstractService implements
 
     return historyUrl;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
index 1f05064..c1c363b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -35,7 +35,7 @@ import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.serviceplugins.api.TaskScheduler;
 import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
@@ -916,7 +916,9 @@ public class YarnTaskSchedulerService extends TaskScheduler
       LOG.error("Got TaskSchedulerError, " + ExceptionUtils.getStackTrace(t));
       return;
     }
-    getContext().onError(t);
+    LOG.error("Got Error from RMClient", t);
+    getContext().reportError(YarnTaskSchedulerServiceError.RESOURCEMANAGER_ERROR, StringUtils.stringifyException(t),
+        null);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerServiceError.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerServiceError.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerServiceError.java
new file mode 100644
index 0000000..e8017dd
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerServiceError.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm;
+
+import org.apache.tez.serviceplugins.api.ServicePluginError;
+
+public enum YarnTaskSchedulerServiceError implements ServicePluginError {
+
+  RESOURCEMANAGER_ERROR;
+
+  @Override
+  public Enum getEnum() {
+    return this;
+  }
+
+  @Override
+  public ErrorType getErrorType() {
+    return ErrorType.PERMANENT;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java
index c55bdbd..c551b09 100644
--- a/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java
@@ -36,7 +36,6 @@ import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.event.VertexState;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 
@@ -44,7 +43,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
 // Do not make calls into this from within a held lock.
 
 // TODO TEZ-2003 (post) TEZ-2665. Move to the tez-api module
-public interface TaskCommunicatorContext {
+public interface TaskCommunicatorContext extends ServicePluginContextBase {
 
   // TODO TEZ-2003 (post) TEZ-2666 Enhancements to API
   // - Consolidate usage of IDs
@@ -57,12 +56,6 @@ public interface TaskCommunicatorContext {
   // - Maybe add book-keeping as a helper library, instead of each impl tracking container to task etc.
   // - Handling of containres / tasks which no longer exist in the system (formalized interface instead of a shouldDie notification)
 
-  /**
-   * Get the UserPayload that was configured while setting up the task communicator
-   *
-   * @return the initially configured user payload
-   */
-  UserPayload getInitialUserPayload();
 
   /**
    * Get the application attempt id for the running application. Relevant when running under YARN
@@ -170,11 +163,14 @@ public interface TaskCommunicatorContext {
    */
   void registerForVertexStateUpdates(String vertexName, @Nullable Set<VertexState> stateSet);
 
+  // TODO TEZ-3120 Remove deprecated methods
   /**
    * Get the name of the currently executing dag
    *
    * @return the name of the currently executing dag
+   * @deprecated replaced by {@link TaskCommunicatorContext#getCurrentDagInfo}
    */
+  @Deprecated
   String getCurrentDagName();
 
   /**
@@ -183,10 +179,13 @@ public interface TaskCommunicatorContext {
    */
   String getCurrentAppIdentifier();
 
+  // TODO TEZ-3120 Remove deprecated methods
   /**
    * Get the identifier for the currently executing dag.
    * @return a numerical identifier for the currently running DAG. This is unique within the currently running application.
+   * @deprecated replaced by {@link TaskCommunicatorContext#getCurrentDagInfo}
    */
+  @Deprecated
   int getCurrentDagIdenitifer();
 
   /**
@@ -237,4 +236,5 @@ public interface TaskCommunicatorContext {
    * @return time when the current dag started executing
    */
   long getDagStartTime();
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java
index 80414ba..23a5191 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java
@@ -112,7 +112,7 @@ public class TestDAGClientHandler {
     }
     dagClientHandler.tryKillDAG("dag_9999_0001_1");
     ArgumentCaptor<DAG> eventCaptor = ArgumentCaptor.forClass(DAG.class);
-    verify(mockDagAM, times(1)).tryKillDAG(eventCaptor.capture());
+    verify(mockDagAM, times(1)).tryKillDAG(eventCaptor.capture(), eq("Kill Dag request received from client"));
     assertEquals(1, eventCaptor.getAllValues().size());
     assertTrue(eventCaptor.getAllValues().get(0) instanceof DAG);
     assertEquals("dag_9999_0001_1",  ((DAG)eventCaptor.getAllValues().get(0)).getID().toString());
@@ -125,7 +125,7 @@ public class TestDAGClientHandler {
     
     // shutdown
     dagClientHandler.shutdownAM();
-    verify(mockDagAM).shutdownTezAM();
+    verify(mockDagAM).shutdownTezAM(eq("AM Shutdown request received from client"));
   }
   
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index 08f81fb..b021a36 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -520,10 +520,12 @@ public class MockDAGAppMaster extends DAGAppMaster {
     } catch (IOException e) {
       throw new TezUncheckedException(e);
     }
+    ContainerLauncherManager clManager = new ContainerLauncherManager(getContext());
     ContainerLauncherContext containerLauncherContext =
-        new ContainerLauncherContextImpl(getContext(), getTaskCommunicatorManager(), userPayload);
+        new ContainerLauncherContextImpl(getContext(), clManager, getTaskCommunicatorManager(), userPayload, 0);
     containerLauncher = new MockContainerLauncher(launcherGoFlag, containerLauncherContext);
-    return new ContainerLauncherManager(containerLauncher, getContext());
+    clManager.setContainerLauncher(containerLauncher);
+    return clManager;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
index d5ee67d..74ac51e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
@@ -854,7 +854,8 @@ public class TestMockDAGAppMaster {
 
     tezClient.submitDAG(dag);
     mockLauncher.waitTillContainersLaunched();
-    mockApp.handle(new DAGAppMasterEventSchedulingServiceError(new RuntimeException("Mock error")));
+    mockApp.handle(new DAGAppMasterEventSchedulingServiceError(
+        org.apache.hadoop.util.StringUtils.stringifyException(new RuntimeException("Mock error"))));
 
     while(!mockApp.getShutdownHandler().wasShutdownInvoked()) {
       Thread.sleep(100);

http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java
index 5323928..c7f97d3 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java
@@ -23,11 +23,13 @@ import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
@@ -42,6 +44,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -49,6 +52,11 @@ import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.NamedEntityDescriptor;
+import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag;
+import org.apache.tez.dag.helpers.DagInfoImplForTest;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.serviceplugins.api.ServicePluginErrorDefaults;
+import org.apache.tez.serviceplugins.api.ServicePluginException;
 import org.apache.tez.serviceplugins.api.TaskCommunicator;
 import org.apache.tez.serviceplugins.api.TaskCommunicatorContext;
 import org.apache.tez.dag.api.TezConstants;
@@ -62,6 +70,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.serviceplugins.api.ContainerEndReason;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
+import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -73,7 +82,7 @@ public class TestTaskCommunicatorManager {
 
   @Before
   @After
-  public void reset() {
+  public void resetForNextTest() {
     TaskCommManagerForMultipleCommTest.reset();
   }
 
@@ -233,6 +242,71 @@ public class TestTaskCommunicatorManager {
 
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
+  public void testReportFailureFromTaskCommunicator() throws TezException {
+    String dagName = DAG_NAME;
+    EventHandler eventHandler = mock(EventHandler.class);
+    AppContext appContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
+    doReturn("testTaskCommunicator").when(appContext).getTaskCommunicatorName(0);
+    doReturn(eventHandler).when(appContext).getEventHandler();
+
+    DAG dag = mock(DAG.class);
+    TezDAGID dagId = TezDAGID.getInstance(ApplicationId.newInstance(1, 0), DAG_INDEX);
+    doReturn(dagName).when(dag).getName();
+    doReturn(dagId).when(dag).getID();
+    doReturn(dag).when(appContext).getCurrentDAG();
+
+    NamedEntityDescriptor<TaskCommunicatorDescriptor> namedEntityDescriptor =
+        new NamedEntityDescriptor<>("testTaskCommunicator", TaskCommForFailureTest.class.getName());
+    List<NamedEntityDescriptor> list = new LinkedList<>();
+    list.add(namedEntityDescriptor);
+
+
+    TaskCommunicatorManager taskCommManager =
+        new TaskCommunicatorManager(appContext, mock(TaskHeartbeatHandler.class),
+            mock(ContainerHeartbeatHandler.class), list);
+    try {
+      taskCommManager.init(new Configuration());
+      taskCommManager.start();
+
+      taskCommManager.registerRunningContainer(mock(ContainerId.class), 0);
+      ArgumentCaptor<Event> argumentCaptor = ArgumentCaptor.forClass(Event.class);
+      verify(eventHandler, times(1)).handle(argumentCaptor.capture());
+
+      Event rawEvent = argumentCaptor.getValue();
+      assertTrue(rawEvent instanceof DAGEventTerminateDag);
+      DAGEventTerminateDag killEvent = (DAGEventTerminateDag) rawEvent;
+      assertTrue(killEvent.getDiagnosticInfo().contains("ReportError"));
+      assertTrue(killEvent.getDiagnosticInfo()
+          .contains(ServicePluginErrorDefaults.SERVICE_UNAVAILABLE.name()));
+      assertTrue(killEvent.getDiagnosticInfo().contains("[0:testTaskCommunicator]"));
+
+
+      reset(eventHandler);
+
+      taskCommManager.dagComplete(dag);
+
+      argumentCaptor = ArgumentCaptor.forClass(Event.class);
+
+      verify(eventHandler, times(1)).handle(argumentCaptor.capture());
+      rawEvent = argumentCaptor.getValue();
+
+      assertTrue(rawEvent instanceof DAGAppMasterEventUserServiceFatalError);
+      DAGAppMasterEventUserServiceFatalError event =
+          (DAGAppMasterEventUserServiceFatalError) rawEvent;
+      assertEquals(DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR, event.getType());
+      assertTrue(event.getDiagnosticInfo().contains("ReportedFatalError"));
+      assertTrue(
+          event.getDiagnosticInfo().contains(ServicePluginErrorDefaults.INCONSISTENT_STATE.name()));
+      assertTrue(event.getDiagnosticInfo().contains("[0:testTaskCommunicator]"));
+
+    } finally {
+      taskCommManager.stop();
+    }
+
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
   public void testTaskCommunicatorUserError() {
     TaskCommunicatorContextImpl taskCommContext = mock(TaskCommunicatorContextImpl.class);
     TaskCommunicator taskCommunicator = mock(TaskCommunicator.class, new ExceptionAnswer());
@@ -313,7 +387,6 @@ public class TestTaskCommunicatorManager {
     }
   }
 
-
   static class TaskCommManagerForMultipleCommTest extends TaskCommunicatorManager {
 
     // All variables setup as static since methods being overridden are invoked by the ContainerLauncherRouter ctor,
@@ -460,4 +533,63 @@ public class TestTaskCommunicatorManager {
       return null;
     }
   }
+
+  private static final String DAG_NAME = "dagName";
+  private static final int DAG_INDEX = 1;
+  public static class TaskCommForFailureTest extends TaskCommunicator {
+
+    public TaskCommForFailureTest(
+        TaskCommunicatorContext taskCommunicatorContext) {
+      super(taskCommunicatorContext);
+    }
+
+    @Override
+    public void registerRunningContainer(ContainerId containerId, String hostname, int port) throws
+        ServicePluginException {
+      getContext()
+          .reportError(ServicePluginErrorDefaults.SERVICE_UNAVAILABLE, "ReportError", new DagInfoImplForTest(DAG_INDEX, DAG_NAME));
+    }
+
+    @Override
+    public void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason,
+                                     @Nullable String diagnostics) throws ServicePluginException {
+
+    }
+
+    @Override
+    public void registerRunningTaskAttempt(ContainerId containerId, TaskSpec taskSpec,
+                                           Map<String, LocalResource> additionalResources,
+                                           Credentials credentials, boolean credentialsChanged,
+                                           int priority) throws ServicePluginException {
+
+    }
+
+    @Override
+    public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID,
+                                             TaskAttemptEndReason endReason,
+                                             @Nullable String diagnostics) throws
+        ServicePluginException {
+
+    }
+
+    @Override
+    public InetSocketAddress getAddress() throws ServicePluginException {
+      return null;
+    }
+
+    @Override
+    public void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws ServicePluginException {
+
+    }
+
+    @Override
+    public void dagComplete(int dagIdentifier) throws ServicePluginException {
+      getContext().reportError(ServicePluginErrorDefaults.INCONSISTENT_STATE, "ReportedFatalError", null);
+    }
+
+    @Override
+    public Object getMetaInfo() throws ServicePluginException {
+      return null;
+    }
+  }
 }