You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/14 22:58:59 UTC

[41/50] [abbrv] tez git commit: TEZ-2678. Fix comments from reviews - part 1. (sseth)

TEZ-2678. Fix comments from reviews - part 1. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/fee059b9
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/fee059b9
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/fee059b9

Branch: refs/heads/TEZ-2003
Commit: fee059b9030f76432b308105ded559c5195f922a
Parents: a960c64
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue Aug 11 11:19:09 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Aug 14 13:47:10 2015 -0700

----------------------------------------------------------------------
 TEZ-2003-CHANGES.txt                            |   1 +
 .../java/org/apache/tez/client/TezClient.java   |   2 +-
 .../org/apache/tez/client/TezClientUtils.java   |  14 +-
 .../main/java/org/apache/tez/dag/api/DAG.java   |  76 ++++++++++-
 .../apache/tez/dag/api/DagTypeConverters.java   |   2 +-
 .../api/ContainerLauncherOperationBase.java     |   8 +-
 .../api/ServicePluginsDescriptor.java           |  13 ++
 .../api/TaskAttemptEndReason.java               |   2 +-
 .../tez/serviceplugins/api/TaskScheduler.java   |   6 +-
 .../java/org/apache/tez/dag/api/TestDAG.java    |   2 +-
 .../org/apache/tez/dag/api/TestDAGPlan.java     | 113 ++++++++++++++-
 .../tez/dag/api/TestDagTypeConverters.java      |  11 +-
 .../org/apache/tez/common/TezUtilsInternal.java |   5 +-
 .../apache/tez/dag/api/TaskCommunicator.java    |  12 +-
 .../tez/dag/api/TaskCommunicatorContext.java    |   2 +-
 .../apache/tez/dag/app/TaskAttemptListener.java |   4 +-
 .../dag/app/TaskAttemptListenerImpTezDag.java   |   9 +-
 .../dag/app/TaskCommunicatorContextImpl.java    |  15 +-
 .../tez/dag/app/TezTaskCommunicatorImpl.java    |  10 +-
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |  13 +-
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |   9 +-
 .../app/launcher/ContainerLauncherRouter.java   |  19 +--
 .../tez/dag/app/rm/AMSchedulerEventTAEnded.java |   8 +-
 .../dag/app/rm/LocalTaskSchedulerService.java   |   2 +-
 .../dag/app/rm/TaskSchedulerEventHandler.java   |   5 +-
 .../dag/app/rm/YarnTaskSchedulerService.java    |   5 +-
 .../dag/app/rm/container/AMContainerImpl.java   |  62 +++++----
 .../tez/dag/app/rm/node/AMNodeTracker.java      |   2 +-
 .../app/TestTaskAttemptListenerImplTezDag.java  |   6 +-
 .../app/TestTaskAttemptListenerImplTezDag2.java |   3 +-
 .../app/TestTaskCommunicatorContextImpl.java    |  85 ++++++++++++
 .../dag/app/TestTaskCommunicatorManager.java    |   4 +-
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   |  12 +-
 .../launcher/TestContainerLauncherRouter.java   |   6 +-
 .../tez/dag/app/rm/TestContainerReuse.java      | 136 +++++++++++++------
 .../app/rm/TestLocalTaskSchedulerService.java   |   4 +-
 .../tez/dag/app/rm/TestTaskScheduler.java       |  18 +--
 .../app/rm/TestTaskSchedulerEventHandler.java   |  13 +-
 .../dag/app/rm/container/TestAMContainer.java   | 127 +++++++++++------
 .../org/apache/tez/examples/JoinValidate.java   |   8 ++
 tez-ext-service-tests/pom.xml                   |   5 -
 .../rm/TezTestServiceTaskSchedulerService.java  |   5 +-
 .../TezTestServiceTaskCommunicatorImpl.java     |  10 +-
 .../tez/service/impl/ContainerRunnerImpl.java   |   2 +-
 .../apache/tez/runtime/task/TezTaskRunner2.java |  16 +--
 .../runtime/task/TaskExecutionTestHelpers.java  |   1 +
 .../runtime/task/TestContainerExecution.java    |   1 +
 47 files changed, 663 insertions(+), 231 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index 75fac88..fd3374e 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -45,5 +45,6 @@ ALL CHANGES:
   TEZ-2126. Add unit tests for verifying multiple schedulers, launchers, communicators.
   TEZ-2698. rebase 08/05
   TEZ-2675. Add javadocs for new pluggable components, fix problems reported by jenkins
+  TEZ-2678. Fix comments from reviews - part 1.
 
 INCOMPATIBLE CHANGES:

http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index 373be81..036b5e8 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -471,7 +471,7 @@ public class TezClient {
 
     Map<String, LocalResource> tezJarResources = getTezJarResources(sessionCredentials);
     DAGPlan dagPlan = TezClientUtils.prepareAndCreateDAGPlan(dag, amConfig, tezJarResources,
-        usingTezArchiveDeploy, sessionCredentials, aclConfigs);
+        usingTezArchiveDeploy, sessionCredentials, aclConfigs, servicePluginsDescriptor);
 
     SubmitDAGRequestProto.Builder requestBuilder = SubmitDAGRequestProto.newBuilder();
     requestBuilder.setDAGPlan(dagPlan).build();

http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index 6086fa1..ecf5c07 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -609,7 +609,7 @@ public class TezClientUtils {
     if(dag != null) {
       
       DAGPlan dagPB = prepareAndCreateDAGPlan(dag, amConfig, tezJarResources, tezLrsAsArchive,
-          sessionCreds);
+          sessionCreds, servicePluginsDescriptor);
 
       // emit protobuf DAG file style
       Path binaryPath = TezCommonUtils.getTezBinPlanStagingPath(tezSysStagingPath);
@@ -685,18 +685,19 @@ public class TezClientUtils {
   
   static DAGPlan prepareAndCreateDAGPlan(DAG dag, AMConfiguration amConfig,
       Map<String, LocalResource> tezJarResources, boolean tezLrsAsArchive,
-      Credentials credentials) throws IOException {
+      Credentials credentials, ServicePluginsDescriptor servicePluginsDescriptor) throws IOException {
     return prepareAndCreateDAGPlan(dag, amConfig, tezJarResources, tezLrsAsArchive, credentials,
-        null);
+        null, servicePluginsDescriptor);
   }
 
   static DAGPlan prepareAndCreateDAGPlan(DAG dag, AMConfiguration amConfig,
       Map<String, LocalResource> tezJarResources, boolean tezLrsAsArchive,
-      Credentials credentials, Map<String, String> additionalDAGConfigs) throws IOException {
+      Credentials credentials, Map<String, String> additionalDAGConfigs,
+      ServicePluginsDescriptor servicePluginsDescriptor) throws IOException {
     Credentials dagCredentials = setupDAGCredentials(dag, credentials,
         amConfig.getTezConfiguration());
     return dag.createDag(amConfig.getTezConfiguration(), dagCredentials, tezJarResources,
-        amConfig.getBinaryConfLR(), tezLrsAsArchive, additionalDAGConfigs);
+        amConfig.getBinaryConfLR(), tezLrsAsArchive, additionalDAGConfigs, servicePluginsDescriptor);
   }
   
   static void maybeAddDefaultLoggingJavaOpts(String logLevel, List<String> vargs) {
@@ -776,7 +777,7 @@ public class TezClientUtils {
     }
 
     AMPluginDescriptorProto pluginDescriptorProto =
-        DagTypeConverters.convertServicePluginDescriptoToProto(servicePluginsDescriptor);
+        DagTypeConverters.convertServicePluginDescriptorToProto(servicePluginsDescriptor);
     builder.setAmPluginDescriptor(pluginDescriptorProto);
 
     return builder.build();
@@ -1035,4 +1036,5 @@ public class TezClientUtils {
       }
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index 927039a..78bb660 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -35,6 +35,8 @@ import org.apache.commons.collections4.bidimap.DualLinkedHashBidiMap;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.tez.dag.api.Vertex.VertexExecutionContext;
 import org.apache.tez.dag.api.records.DAGProtos;
+import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
+import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -714,14 +716,15 @@ public class DAG {
                            Map<String, LocalResource> tezJarResources, LocalResource binaryConfig,
                            boolean tezLrsAsArchive) {
     return createDag(tezConf, extraCredentials, tezJarResources, binaryConfig, tezLrsAsArchive,
-        null);
+        null, null);
   }
 
   // create protobuf message describing DAG
   @Private
   public synchronized DAGPlan createDag(Configuration tezConf, Credentials extraCredentials,
       Map<String, LocalResource> tezJarResources, LocalResource binaryConfig,
-      boolean tezLrsAsArchive, Map<String, String> additionalConfigs) {
+      boolean tezLrsAsArchive, Map<String, String> additionalConfigs,
+                                        ServicePluginsDescriptor servicePluginsDescriptor) {
     verify(true);
 
     DAGPlan.Builder dagBuilder = DAGPlan.newBuilder();
@@ -732,6 +735,7 @@ public class DAG {
 
     // Setup default execution context.
     VertexExecutionContext defaultContext = getDefaultExecutionContext();
+    verifyExecutionContext(defaultContext, servicePluginsDescriptor, "DAGDefault");
     if (defaultContext != null) {
       DAGProtos.VertexExecutionContextProto contextProto = DagTypeConverters.convertToProto(
           defaultContext);
@@ -834,6 +838,7 @@ public class DAG {
 
       // Vertex ExecutionContext setup
       VertexExecutionContext execContext = vertex.getVertexExecutionContext();
+      verifyExecutionContext(execContext, servicePluginsDescriptor, vertex.getName());
       if (execContext != null) {
         DAGProtos.VertexExecutionContextProto contextProto =
             DagTypeConverters.convertToProto(execContext);
@@ -986,4 +991,71 @@ public class DAG {
     
     return dagBuilder.build();
   }
+
+  private void verifyExecutionContext(VertexExecutionContext executionContext,
+                                      ServicePluginsDescriptor servicePluginsDescriptor,
+                                      String context) {
+    if (executionContext != null) {
+      if (executionContext.shouldExecuteInContainers()) {
+        if (servicePluginsDescriptor == null || !servicePluginsDescriptor.areContainersEnabled()) {
+          throw new IllegalStateException("Invalid configuration. ExecutionContext for " + context +
+              " specifies container execution but this is disabled in the ServicePluginDescriptor");
+        }
+      }
+      if (executionContext.shouldExecuteInAm()) {
+        if (servicePluginsDescriptor == null || !servicePluginsDescriptor.isUberEnabled()) {
+          throw new IllegalStateException("Invalid configuration. ExecutionContext for " + context +
+              " specifies AM execution but this is disabled in the ServicePluginDescriptor");
+        }
+      }
+      if (executionContext.getTaskSchedulerName() != null) {
+        boolean found = false;
+        if (servicePluginsDescriptor != null) {
+          found = checkNamedEntityExists(executionContext.getTaskSchedulerName(),
+              servicePluginsDescriptor.getTaskSchedulerDescriptors());
+        }
+        if (!found) {
+          throw new IllegalStateException("Invalid configuration. ExecutionContext for " + context +
+              " specifies task scheduler as " + executionContext.getTaskSchedulerName() +
+              " which is not part of the ServicePluginDescriptor");
+        }
+      }
+      if (executionContext.getContainerLauncherName() != null) {
+        boolean found = false;
+        if (servicePluginsDescriptor != null) {
+          found = checkNamedEntityExists(executionContext.getContainerLauncherName(),
+              servicePluginsDescriptor.getContainerLauncherDescriptors());
+        }
+        if (!found) {
+          throw new IllegalStateException("Invalid configuration. ExecutionContext for " + context +
+              " specifies container launcher as " + executionContext.getContainerLauncherName() +
+              " which is not part of the ServicePluginDescriptor");
+        }
+      }
+      if (executionContext.getTaskCommName() != null) {
+        boolean found = false;
+        if (servicePluginsDescriptor != null) {
+          found = checkNamedEntityExists(executionContext.getTaskCommName(),
+              servicePluginsDescriptor.getTaskCommunicatorDescriptors());
+        }
+        if (!found) {
+          throw new IllegalStateException("Invalid configuration. ExecutionContext for " + context +
+              " specifies task communicator as " + executionContext.getTaskCommName() +
+              " which is not part of the ServicePluginDescriptor");
+        }
+      }
+    }
+  }
+
+  private boolean checkNamedEntityExists(String expected, NamedEntityDescriptor[] namedEntities) {
+    if (namedEntities == null) {
+      return false;
+    }
+    for (NamedEntityDescriptor named : namedEntities) {
+      if (named.getEntityName().equals(expected)) {
+        return true;
+      }
+    }
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
index 61e4d33..2823a86 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
@@ -801,7 +801,7 @@ public class DagTypeConverters {
     return builder.build();
   }
 
-  public static AMPluginDescriptorProto convertServicePluginDescriptoToProto(
+  public static AMPluginDescriptorProto convertServicePluginDescriptorToProto(
       ServicePluginsDescriptor servicePluginsDescriptor) {
     AMPluginDescriptorProto.Builder pluginDescriptorBuilder =
         AMPluginDescriptorProto.newBuilder();

http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherOperationBase.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherOperationBase.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherOperationBase.java
index 260b681..98806fa 100644
--- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherOperationBase.java
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherOperationBase.java
@@ -42,8 +42,8 @@ public class ContainerLauncherOperationBase {
   }
 
   /**
-   * Get the node on whcih this container is to be launched
-   * @return
+   * Get the node on which this container is to be launched
+   * @return the node id for the container
    */
   public NodeId getNodeId() {
     return nodeId;
@@ -51,7 +51,7 @@ public class ContainerLauncherOperationBase {
 
   /**
    * Get the containerId for the container
-   * @return
+   * @return the container id for the container opeartion
    */
   public ContainerId getContainerId() {
     return containerId;
@@ -59,7 +59,7 @@ public class ContainerLauncherOperationBase {
 
   /**
    * Get the security token for the container. Primarily for YARN
-   * @return
+   * @return the token for the container launch.
    */
   public Token getContainerToken() {
     return containerToken;

http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java
index ce35350..113b7db 100644
--- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java
@@ -14,6 +14,8 @@
 
 package org.apache.tez.serviceplugins.api;
 
+import java.util.Arrays;
+
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -138,4 +140,15 @@ public class ServicePluginsDescriptor {
   public TaskCommunicatorDescriptor[] getTaskCommunicatorDescriptors() {
     return taskCommunicatorDescriptors;
   }
+
+  @Override
+  public String toString() {
+    return "ServicePluginsDescriptor{" +
+        "enableContainers=" + enableContainers +
+        ", enableUber=" + enableUber +
+        ", taskSchedulerDescriptors=" + Arrays.toString(taskSchedulerDescriptors) +
+        ", containerLauncherDescriptors=" + Arrays.toString(containerLauncherDescriptors) +
+        ", taskCommunicatorDescriptors=" + Arrays.toString(taskCommunicatorDescriptors) +
+        '}';
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskAttemptEndReason.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskAttemptEndReason.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskAttemptEndReason.java
index 4255c28..bff36cd 100644
--- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskAttemptEndReason.java
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskAttemptEndReason.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 public enum TaskAttemptEndReason {
   NODE_FAILED, // Completed because the node running the container was marked as dead
   COMMUNICATION_ERROR, // Communication error with the task
-  SERVICE_BUSY, // External service busy
+  EXECUTOR_BUSY, // External service busy
   INTERNAL_PREEMPTION, // Preempted by the AM, due to an internal decision
   EXTERNAL_PREEMPTION, // Preempted due to cluster contention
   APPLICATION_ERROR, // An error in the AM caused by user code

http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java
index f05bddc..9a864c5 100644
--- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java
@@ -14,6 +14,8 @@
 
 package org.apache.tez.serviceplugins.api;
 
+import javax.annotation.Nullable;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -178,11 +180,13 @@ public abstract class TaskScheduler implements ServicePluginLifecycle {
    * @param task          the task being de-allocated.
    * @param taskSucceeded whether the task succeeded or not
    * @param endReason     the reason for the task failure
+   * @param diagnostics   additional diagnostics information which may be relevant
    * @return true if the task was associated with a container, false if the task was not associated
    * with a container
    */
   public abstract boolean deallocateTask(Object task, boolean taskSucceeded,
-                                         TaskAttemptEndReason endReason);
+                                         TaskAttemptEndReason endReason,
+                                         @Nullable String diagnostics);
 
   /**
    * A request to de-allocate a previously allocated container.

http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
index 3fe17df..268267b 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
@@ -86,7 +86,7 @@ public class TestDAG {
         dummyTaskCount, dummyTaskResource);
 
     DAG dag = DAG.create("testDAG");
-    dag.createVertexGroup("group_1", v1,v2);
+    dag.createVertexGroup("group_1", v1, v2);
 
     try {
       dag.createVertexGroup("group_1", v2, v3);

http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
index cd42109..7edea2f 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
@@ -38,7 +38,6 @@ import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.Vertex.VertexExecutionContext;
-import org.apache.tez.dag.api.records.DAGProtos;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
 import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration;
@@ -46,6 +45,10 @@ import org.apache.tez.dag.api.records.DAGProtos.PlanTaskLocationHint;
 import org.apache.tez.dag.api.records.DAGProtos.PlanVertexType;
 import org.apache.tez.dag.api.records.DAGProtos.VertexExecutionContextProto;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
+import org.apache.tez.serviceplugins.api.ContainerLauncherDescriptor;
+import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
+import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor;
+import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -317,6 +320,108 @@ public class TestDAGPlan {
   }
 
   @Test(timeout = 5000)
+  public void testInvalidExecContext_1() {
+    DAG dag = DAG.create("dag1");
+    dag.setExecutionContext(VertexExecutionContext.createExecuteInAm(true));
+    Vertex v1 = Vertex.create("testvertex", ProcessorDescriptor.create("processor1"), 1);
+    dag.addVertex(v1);
+
+    try {
+      dag.createDag(new TezConfiguration(false), null, null, null, true, null, null);
+      fail("Expecting dag create to fail due to invalid ServicePluginDescriptor");
+    } catch (IllegalStateException e) {
+      assertTrue(e.getMessage().contains("AM execution"));
+    }
+
+    dag.setExecutionContext(VertexExecutionContext.createExecuteInContainers(true));
+
+    try {
+      dag.createDag(new TezConfiguration(false), null, null, null, true, null, null);
+      fail("Expecting dag create to fail due to invalid ServicePluginDescriptor");
+    } catch (IllegalStateException e) {
+      assertTrue(e.getMessage().contains("container execution"));
+    }
+
+  }
+
+  @Test(timeout = 5000)
+  public void testInvalidExecContext_2() {
+
+    ServicePluginsDescriptor servicePluginsDescriptor = ServicePluginsDescriptor
+        .create(false,
+            new TaskSchedulerDescriptor[]{TaskSchedulerDescriptor.create("plugin", null)},
+            new ContainerLauncherDescriptor[]{ContainerLauncherDescriptor.create("plugin", null)},
+            new TaskCommunicatorDescriptor[]{TaskCommunicatorDescriptor.create("plugin", null)});
+
+    VertexExecutionContext validExecContext = VertexExecutionContext.create("plugin", "plugin",
+        "plugin");
+    VertexExecutionContext invalidExecContext1 =
+        VertexExecutionContext.create("invalidplugin", "plugin", "plugin");
+    VertexExecutionContext invalidExecContext2 =
+        VertexExecutionContext.create("plugin", "invalidplugin", "plugin");
+    VertexExecutionContext invalidExecContext3 =
+        VertexExecutionContext.create("plugin", "plugin", "invalidplugin");
+
+
+    DAG dag = DAG.create("dag1");
+    dag.setExecutionContext(VertexExecutionContext.createExecuteInContainers(true));
+    Vertex v1 = Vertex.create("testvertex", ProcessorDescriptor.create("processor1"), 1);
+    dag.addVertex(v1);
+
+    // Should succeed. Default context is containers.
+    dag.createDag(new TezConfiguration(false), null, null, null, true, null,
+        servicePluginsDescriptor);
+
+
+    // Set execute in AM should fail
+    v1.setExecutionContext(VertexExecutionContext.createExecuteInAm(true));
+    try {
+      dag.createDag(new TezConfiguration(false), null, null, null, true, null, servicePluginsDescriptor);
+      fail("Expecting dag create to fail due to invalid ServicePluginDescriptor");
+    } catch (IllegalStateException e) {
+      assertTrue(e.getMessage().contains("AM execution"));
+    }
+
+    // Valid context
+    v1.setExecutionContext(validExecContext);
+    dag.createDag(new TezConfiguration(false), null, null, null, true, null, servicePluginsDescriptor);
+
+    // Invalid task scheduler
+    v1.setExecutionContext(invalidExecContext1);
+    try {
+      dag.createDag(new TezConfiguration(false), null, null, null, true, null, servicePluginsDescriptor);
+      fail("Expecting dag create to fail due to invalid ServicePluginDescriptor");
+    } catch (IllegalStateException e) {
+      assertTrue(e.getMessage().contains("testvertex"));
+      assertTrue(e.getMessage().contains("task scheduler"));
+      assertTrue(e.getMessage().contains("invalidplugin"));
+    }
+
+    // Invalid ContainerLauncher
+    v1.setExecutionContext(invalidExecContext2);
+    try {
+      dag.createDag(new TezConfiguration(false), null, null, null, true, null, servicePluginsDescriptor);
+      fail("Expecting dag create to fail due to invalid ServicePluginDescriptor");
+    } catch (IllegalStateException e) {
+      assertTrue(e.getMessage().contains("testvertex"));
+      assertTrue(e.getMessage().contains("container launcher"));
+      assertTrue(e.getMessage().contains("invalidplugin"));
+    }
+
+    // Invalid task comm
+    v1.setExecutionContext(invalidExecContext3);
+    try {
+      dag.createDag(new TezConfiguration(false), null, null, null, true, null, servicePluginsDescriptor);
+      fail("Expecting dag create to fail due to invalid ServicePluginDescriptor");
+    } catch (IllegalStateException e) {
+      assertTrue(e.getMessage().contains("testvertex"));
+      assertTrue(e.getMessage().contains("task communicator"));
+      assertTrue(e.getMessage().contains("invalidplugin"));
+    }
+
+  }
+
+  @Test(timeout = 5000)
   public void testServiceDescriptorPropagation() {
     DAG dag = DAG.create("testDag");
     ProcessorDescriptor pd1 = ProcessorDescriptor.create("processor1").
@@ -328,6 +433,10 @@ public class TestDAGPlan {
         VertexExecutionContext.create("plugin", "plugin", "plugin");
     VertexExecutionContext v1Context = VertexExecutionContext.createExecuteInAm(true);
 
+    ServicePluginsDescriptor servicePluginsDescriptor = ServicePluginsDescriptor
+        .create(true, new TaskSchedulerDescriptor[]{TaskSchedulerDescriptor.create("plugin", null)},
+            new ContainerLauncherDescriptor[]{ContainerLauncherDescriptor.create("plugin", null)},
+            new TaskCommunicatorDescriptor[]{TaskCommunicatorDescriptor.create("plugin", null)});
 
     Vertex v1 = Vertex.create("v1", pd1, 10, Resource.newInstance(1024, 1)).setExecutionContext(v1Context);
     Vertex v2 = Vertex.create("v2", pd2, 1, Resource.newInstance(1024, 1));
@@ -347,7 +456,7 @@ public class TestDAGPlan {
     dag.addVertex(v1).addVertex(v2).addEdge(edge);
     dag.setExecutionContext(defaultExecutionContext);
 
-    DAGPlan dagProto = dag.createDag(new TezConfiguration(), null, null, null, true);
+    DAGPlan dagProto = dag.createDag(new TezConfiguration(), null, null, null, true, null, servicePluginsDescriptor);
 
     assertEquals(2, dagProto.getVertexCount());
     assertEquals(1, dagProto.getEdgeCount());

http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
index e37f849..6f795fc 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
@@ -33,16 +33,13 @@ import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.dag.api.Vertex.VertexExecutionContext;
-import org.apache.tez.dag.api.records.DAGProtos;
 import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
 import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
 import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto;
 import org.apache.tez.dag.api.records.DAGProtos.VertexExecutionContextProto;
-import org.apache.tez.serviceplugins.api.ContainerLauncher;
 import org.apache.tez.serviceplugins.api.ContainerLauncherDescriptor;
 import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
 import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor;
-import org.apache.tez.serviceplugins.api.TaskScheduler;
 import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor;
 import org.junit.Assert;
 import org.junit.Test;
@@ -152,7 +149,7 @@ public class TestDagTypeConverters {
 
     // Uber-execution
     servicePluginsDescriptor = ServicePluginsDescriptor.create(true);
-    proto = DagTypeConverters.convertServicePluginDescriptoToProto(servicePluginsDescriptor);
+    proto = DagTypeConverters.convertServicePluginDescriptorToProto(servicePluginsDescriptor);
     assertTrue(proto.hasUberEnabled());
     assertTrue(proto.hasContainersEnabled());
     assertTrue(proto.getUberEnabled());
@@ -168,7 +165,7 @@ public class TestDagTypeConverters {
 
     servicePluginsDescriptor = ServicePluginsDescriptor.create(taskSchedulers, containerLaunchers,
         taskComms);
-    proto = DagTypeConverters.convertServicePluginDescriptoToProto(servicePluginsDescriptor);
+    proto = DagTypeConverters.convertServicePluginDescriptorToProto(servicePluginsDescriptor);
     assertTrue(proto.hasUberEnabled());
     assertTrue(proto.hasContainersEnabled());
     assertFalse(proto.getUberEnabled());
@@ -185,7 +182,7 @@ public class TestDagTypeConverters {
 
     servicePluginsDescriptor = ServicePluginsDescriptor.create(taskSchedulers, containerLaunchers,
         taskComms);
-    proto = DagTypeConverters.convertServicePluginDescriptoToProto(servicePluginsDescriptor);
+    proto = DagTypeConverters.convertServicePluginDescriptorToProto(servicePluginsDescriptor);
     assertTrue(proto.hasUberEnabled());
     assertTrue(proto.hasContainersEnabled());
     assertFalse(proto.getUberEnabled());
@@ -201,7 +198,7 @@ public class TestDagTypeConverters {
 
     servicePluginsDescriptor = ServicePluginsDescriptor.create(false, true, taskSchedulers, containerLaunchers,
         taskComms);
-    proto = DagTypeConverters.convertServicePluginDescriptoToProto(servicePluginsDescriptor);
+    proto = DagTypeConverters.convertServicePluginDescriptorToProto(servicePluginsDescriptor);
     assertTrue(proto.hasUberEnabled());
     assertTrue(proto.hasContainersEnabled());
     assertTrue(proto.getUberEnabled());

http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
index 1fb7ff9..d6ef901 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
@@ -45,7 +45,6 @@ import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.records.DAGProtos;
 import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
 import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
-import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -271,7 +270,7 @@ public class TezUtilsInternal {
     switch (taskAttemptEndReason) {
       case COMMUNICATION_ERROR:
         return TaskAttemptTerminationCause.COMMUNICATION_ERROR;
-      case SERVICE_BUSY:
+      case EXECUTOR_BUSY:
         return TaskAttemptTerminationCause.SERVICE_BUSY;
       case INTERNAL_PREEMPTION:
         return TaskAttemptTerminationCause.INTERNAL_PREEMPTION;
@@ -301,7 +300,7 @@ public class TezUtilsInternal {
       case COMMUNICATION_ERROR:
         return TaskAttemptEndReason.COMMUNICATION_ERROR;
       case SERVICE_BUSY:
-        return TaskAttemptEndReason.SERVICE_BUSY;
+        return TaskAttemptEndReason.EXECUTOR_BUSY;
       case INTERNAL_PREEMPTION:
         return TaskAttemptEndReason.INTERNAL_PREEMPTION;
       case EXTERNAL_PREEMPTION:

http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
index 4fc541c..f1f683b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
@@ -14,6 +14,7 @@
 
 package org.apache.tez.dag.api;
 
+import javax.annotation.Nullable;
 import java.net.InetSocketAddress;
 import java.util.Map;
 
@@ -115,8 +116,10 @@ public abstract class TaskCommunicator implements ServicePluginLifecycle {
    *
    * @param containerId the associated containerId
    * @param endReason   the end reason for the container completing
+   * @param diagnostics diagnostics associated with the container end
    */
-  public abstract void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason);
+  public abstract void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason,
+                                            @Nullable String diagnostics);
 
   /**
    * Register a task attempt to execute on a container
@@ -138,14 +141,15 @@ public abstract class TaskCommunicator implements ServicePluginLifecycle {
 
   /**
    * Register the completion of a task. This may be a result of preemption, the container dying,
-   * the
-   * node dying, the task completing to success
+   * the node dying, the task completing to success
    *
    * @param taskAttemptID the task attempt which has completed / needs to be completed
    * @param endReason     the endReason for the task attempt.
+   * @param diagnostics   diagnostics associated with the task end
    */
   public abstract void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID,
-                                                    TaskAttemptEndReason endReason);
+                                                    TaskAttemptEndReason endReason,
+                                                    @Nullable String diagnostics);
 
   /**
    * Return the address, if any, that the service listens on

http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
index 0a684e7..e81ba2b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
@@ -160,7 +160,7 @@ public interface TaskCommunicatorContext {
    *
    * @return the name of the currently executing dag
    */
-  String getCurretnDagName();
+  String getCurrentDagName();
 
   /**
    * Get the name of the Input vertices for the specified vertex.

http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
index 2eec2fb..761bdb0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
@@ -34,9 +34,9 @@ public interface TaskAttemptListener {
 
   void registerTaskAttempt(AMContainerTask amContainerTask, ContainerId containerId, int taskCommId);
   
-  void unregisterRunningContainer(ContainerId containerId, int taskCommId, ContainerEndReason endReason);
+  void unregisterRunningContainer(ContainerId containerId, int taskCommId, ContainerEndReason endReason, String diagnostics);
   
-  void unregisterTaskAttempt(TezTaskAttemptID attemptID, int taskCommId, TaskAttemptEndReason endReason);
+  void unregisterTaskAttempt(TezTaskAttemptID attemptID, int taskCommId, TaskAttemptEndReason endReason, String diagnostics);
 
   void dagComplete(DAG dag);
 

http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index ad6f2c4..2f6e93c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -176,7 +176,6 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     try {
       Constructor<? extends TaskCommunicator> ctor =
           taskCommClazz.getConstructor(TaskCommunicatorContext.class);
-      ctor.setAccessible(true);
       return ctor.newInstance(taskCommunicatorContext);
     } catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) {
       throw new TezUncheckedException(e);
@@ -366,7 +365,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
   }
 
   @Override
-  public void unregisterRunningContainer(ContainerId containerId, int taskCommId, ContainerEndReason endReason) {
+  public void unregisterRunningContainer(ContainerId containerId, int taskCommId, ContainerEndReason endReason, String diagnostics) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Unregistering Container from TaskAttemptListener: " + containerId);
     }
@@ -374,7 +373,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     if (containerInfo.taskAttemptId != null) {
       registeredAttempts.remove(containerInfo.taskAttemptId);
     }
-    taskCommunicators[taskCommId].registerContainerEnd(containerId, endReason);
+    taskCommunicators[taskCommId].registerContainerEnd(containerId, endReason, diagnostics);
   }
 
   @Override
@@ -408,7 +407,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
   }
 
   @Override
-  public void unregisterTaskAttempt(TezTaskAttemptID attemptId, int taskCommId, TaskAttemptEndReason endReason) {
+  public void unregisterTaskAttempt(TezTaskAttemptID attemptId, int taskCommId, TaskAttemptEndReason endReason, String diagnostics) {
     ContainerId containerId = registeredAttempts.remove(attemptId);
     if (containerId == null) {
       LOG.warn("Unregister task attempt: " + attemptId + " from unknown container");
@@ -422,7 +421,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     }
     // Explicitly putting in a new entry so that synchronization is not required on the existing element in the map.
     registeredContainers.put(containerId, NULL_CONTAINER_INFO);
-    taskCommunicators[taskCommId].unregisterRunningTaskAttempt(attemptId, endReason);
+    taskCommunicators[taskCommId].unregisterRunningTaskAttempt(attemptId, endReason, diagnostics);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
index 0f10305..c56311c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.app.rm.container.AMContainer;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.apache.tez.dag.api.TaskCommunicatorContext;
 import org.apache.tez.dag.api.TaskHeartbeatRequest;
@@ -96,7 +97,13 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver
 
   @Override
   public boolean isKnownContainer(ContainerId containerId) {
-    return context.getAllContainers().get(containerId) != null;
+    AMContainer amContainer = context.getAllContainers().get(containerId);
+    if (amContainer == null ||
+        amContainer.getTaskCommunicatorIdentifier() != taskCommunicatorIndex) {
+      return false;
+    } else {
+      return true;
+    }
   }
 
   @Override
@@ -106,7 +113,9 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver
 
   @Override
   public void containerAlive(ContainerId containerId) {
-    taskAttemptListener.containerAlive(containerId);
+    if (isKnownContainer(containerId)) {
+      taskAttemptListener.containerAlive(containerId);
+    }
   }
 
   @Override
@@ -136,7 +145,7 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver
   }
 
   @Override
-  public String getCurretnDagName() {
+  public String getCurrentDagName() {
     return getDag().getName();
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
index d3f1c44..9ecee5b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
@@ -36,8 +36,12 @@ import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.tez.common.*;
 import org.apache.tez.common.ContainerContext;
+import org.apache.tez.common.ContainerTask;
+import org.apache.tez.common.TezConverterUtils;
+import org.apache.tez.common.TezLocalResource;
+import org.apache.tez.common.TezTaskUmbilicalProtocol;
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.common.security.TokenCache;
@@ -199,7 +203,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
   }
 
   @Override
-  public void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason) {
+  public void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason, String diagnostics) {
     ContainerInfo containerInfo = registeredContainers.remove(containerId);
     if (containerInfo != null) {
       synchronized(containerInfo) {
@@ -245,7 +249,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
 
 
   @Override
-  public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID, TaskAttemptEndReason endReason) {
+  public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID, TaskAttemptEndReason endReason, String diagnostics) {
     TaskAttempt taskAttempt = new TaskAttempt(taskAttemptID);
     ContainerId containerId = attemptToContainerMap.remove(taskAttempt);
     if(containerId == null) {

http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 17f5675..6b474ff 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -82,7 +82,6 @@ import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
 import org.apache.tez.dag.api.records.DAGProtos.PlanVertexGroupInfo;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.DAGAppMasterState;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.DAG;
@@ -180,6 +179,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   private final AppContext appContext;
   private final UserGroupInformation dagUGI;
   private final ACLManager aclManager;
+  private final org.apache.tez.dag.api.Vertex.VertexExecutionContext defaultExecutionContext;
   @VisibleForTesting
   StateChangeNotifier entityUpdateTracker;
 
@@ -538,6 +538,11 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     // this is only for recovery in case it does not call the init transition
     this.startDAGCpuTime = appContext.getCumulativeCPUTime();
     this.startDAGGCTime = appContext.getCumulativeGCTime();
+    if (jobPlan.hasDefaultExecutionContext()) {
+      defaultExecutionContext = DagTypeConverters.convertFromProto(jobPlan.getDefaultExecutionContext());
+    } else {
+      defaultExecutionContext = null;
+    }
     
     this.taskSpecificLaunchCmdOption = new TaskSpecificLaunchCmdOption(dagConf);
     // This "this leak" is okay because the retained pointer is in an
@@ -718,11 +723,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
 
   @Override
   public org.apache.tez.dag.api.Vertex.VertexExecutionContext getDefaultExecutionContext() {
-    if (jobPlan.hasDefaultExecutionContext()) {
-      return DagTypeConverters.convertFromProto(jobPlan.getDefaultExecutionContext());
-    } else {
-      return null;
-    }
+    return defaultExecutionContext;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 65ea3fb..c6d8a7e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -227,7 +227,7 @@ public class TaskAttemptImpl implements TaskAttempt,
           TaskAttemptEventType.TA_KILL_REQUEST,
           new TerminatedBeforeRunningTransition(KILLED_HELPER))
       .addTransition(TaskAttemptStateInternal.START_WAIT,
-          TaskAttemptStateInternal.KILL_IN_PROGRESS,
+          TaskAttemptStateInternal.KILLED,
           TaskAttemptEventType.TA_KILLED,
           new TerminatedBeforeRunningTransition(KILLED_HELPER))
       .addTransition(TaskAttemptStateInternal.START_WAIT,
@@ -267,7 +267,7 @@ public class TaskAttemptImpl implements TaskAttempt,
           TaskAttemptEventType.TA_KILL_REQUEST,
           new TerminatedWhileRunningTransition(KILLED_HELPER))
       .addTransition(TaskAttemptStateInternal.RUNNING,
-          TaskAttemptStateInternal.KILL_IN_PROGRESS,
+          TaskAttemptStateInternal.KILLED,
           TaskAttemptEventType.TA_KILLED,
           new TerminatedWhileRunningTransition(KILLED_HELPER))
       .addTransition(TaskAttemptStateInternal.RUNNING,
@@ -1095,7 +1095,7 @@ public class TaskAttemptImpl implements TaskAttempt,
 
       // Compute node/rack location request even if re-scheduled.
       Set<String> racks = new HashSet<String>();
-      // TODO Post TEZ-2003. Allow for a policy in the VMPlugin to define localicty for different attempts.
+      // TODO Post TEZ-2003. Allow for a policy in the VMPlugin to define locality for different attempts.
       TaskLocationHint locationHint = ta.getTaskLocationHint();
       if (locationHint != null) {
         if (locationHint.getRacks() != null) {
@@ -1266,6 +1266,7 @@ public class TaskAttemptImpl implements TaskAttempt,
       if (sendSchedulerEvent()) {
         ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.containerId, helper
             .getTaskAttemptState(), TezUtilsInternal.toTaskAttemptEndReason(ta.terminationCause),
+            ta instanceof DiagnosableEvent ? ((DiagnosableEvent)ta).getDiagnosticInfo() : null,
             ta.getVertex().getTaskSchedulerIdentifier()));
       }
     }
@@ -1348,7 +1349,7 @@ public class TaskAttemptImpl implements TaskAttempt,
 
       // Inform the Scheduler.
       ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.containerId,
-          TaskAttemptState.SUCCEEDED, null, ta.getVertex().getTaskSchedulerIdentifier()));
+          TaskAttemptState.SUCCEEDED, null, null, ta.getVertex().getTaskSchedulerIdentifier()));
 
       // Inform the task.
       ta.sendEvent(new TaskEventTAUpdate(ta.attemptId,

http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
index d0cee21..b56bd5b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
@@ -70,7 +70,7 @@ public class ContainerLauncherRouter extends AbstractService
                                  TaskAttemptListener taskAttemptListener,
                                  String workingDirectory,
                                  List<NamedEntityDescriptor> containerLauncherDescriptors,
-                                 boolean isPureLocalMode) throws UnknownHostException {
+                                 boolean isPureLocalMode) {
     super(ContainerLauncherRouter.class.getName());
 
     this.appContext = context;
@@ -101,8 +101,7 @@ public class ContainerLauncherRouter extends AbstractService
       TaskAttemptListener taskAttemptListener,
       String workingDirectory,
       int containerLauncherIndex,
-      boolean isPureLocalMode) throws
-      UnknownHostException {
+      boolean isPureLocalMode) {
     if (containerLauncherDescriptor.getEntityName().equals(
         TezConstants.getTezYarnServicePluginName())) {
       return createYarnContainerLauncher(containerLauncherContext);
@@ -126,15 +125,18 @@ public class ContainerLauncherRouter extends AbstractService
                                                 AppContext context,
                                                 TaskAttemptListener taskAttemptListener,
                                                 String workingDirectory,
-                                                boolean isPureLocalMode) throws
-      UnknownHostException {
+                                                boolean isPureLocalMode) {
     LOG.info("Creating LocalContainerLauncher");
     // TODO Post TEZ-2003. LocalContainerLauncher is special cased, since it makes use of
     // extensive internals which are only available at runtime. Will likely require
     // some kind of runtime binding of parameters in the payload to work correctly.
-    return
-        new LocalContainerLauncher(containerLauncherContext, context, taskAttemptListener,
-            workingDirectory, isPureLocalMode);
+    try {
+      return
+          new LocalContainerLauncher(containerLauncherContext, context, taskAttemptListener,
+              workingDirectory, isPureLocalMode);
+    } catch (UnknownHostException e) {
+      throw new TezUncheckedException(e);
+    }
   }
 
   @VisibleForTesting
@@ -149,7 +151,6 @@ public class ContainerLauncherRouter extends AbstractService
     try {
       Constructor<? extends ContainerLauncher> ctor = containerLauncherClazz
           .getConstructor(ContainerLauncherContext.class);
-      ctor.setAccessible(true);
       return ctor.newInstance(containerLauncherContext);
     } catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) {
       throw new TezUncheckedException(e);

http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java
index 33763e7..ccc5465 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java
@@ -29,15 +29,17 @@ public class AMSchedulerEventTAEnded extends AMSchedulerEvent {
   private final ContainerId containerId;
   private final TaskAttemptState state;
   private final TaskAttemptEndReason taskAttemptEndReason;
+  private final String diagnostics;
   private final int schedulerId;
 
   public AMSchedulerEventTAEnded(TaskAttempt attempt, ContainerId containerId,
-      TaskAttemptState state, TaskAttemptEndReason taskAttemptEndReason, int schedulerId) {
+      TaskAttemptState state, TaskAttemptEndReason taskAttemptEndReason, String diagnostics, int schedulerId) {
     super(AMSchedulerEventType.S_TA_ENDED);
     this.attempt = attempt;
     this.containerId = containerId;
     this.state = state;
     this.taskAttemptEndReason = taskAttemptEndReason;
+    this.diagnostics = diagnostics;
     this.schedulerId = schedulerId;
   }
 
@@ -64,4 +66,8 @@ public class AMSchedulerEventTAEnded extends AMSchedulerEvent {
   public TaskAttemptEndReason getTaskAttemptEndReason() {
     return taskAttemptEndReason;
   }
+
+  public String getDiagnostics() {
+    return diagnostics;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
index befde94..f77a9a9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
@@ -131,7 +131,7 @@ public class LocalTaskSchedulerService extends TaskScheduler {
   }
   
   @Override
-  public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEndReason endReason) {
+  public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEndReason endReason, String diagnostics) {
     return taskRequestHandler.addDeallocateTaskRequest(task);
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index f001909..7d2e768 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -286,7 +286,7 @@ public class TaskSchedulerEventHandler extends AbstractService implements
     TaskAttempt attempt = event.getAttempt();
     // Propagate state and failure cause (if any) when informing the scheduler about the de-allocation.
     boolean wasContainerAllocated = taskSchedulers[event.getSchedulerId()]
-        .deallocateTask(attempt, false, event.getTaskAttemptEndReason());
+        .deallocateTask(attempt, false, event.getTaskAttemptEndReason(), event.getDiagnostics());
     // use stored value of container id in case the scheduler has removed this
     // assignment because the task has been deallocated earlier.
     // retroactive case
@@ -331,7 +331,7 @@ public class TaskSchedulerEventHandler extends AbstractService implements
     }
 
     boolean wasContainerAllocated = taskSchedulers[event.getSchedulerId()].deallocateTask(attempt,
-        true, null);
+        true, null, event.getDiagnostics());
     if (!wasContainerAllocated) {
       LOG.error("De-allocated successful task: " + attempt.getID()
           + ", but TaskScheduler reported no container assigned to task");
@@ -436,7 +436,6 @@ public class TaskSchedulerEventHandler extends AbstractService implements
     try {
       Constructor<? extends TaskScheduler> ctor = taskSchedulerClazz
           .getConstructor(TaskSchedulerContext.class);
-      ctor.setAccessible(true);
       return ctor.newInstance(taskSchedulerContext);
     } catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) {
       throw new TezUncheckedException(e);

http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
index 940c5b0..64d0fd2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -980,7 +980,8 @@ public class YarnTaskSchedulerService extends TaskScheduler
    */
   @Override
   public boolean deallocateTask(Object task, boolean taskSucceeded,
-                                TaskAttemptEndReason endReason) {
+                                TaskAttemptEndReason endReason,
+                                String diagnostics) {
     Map<CookieContainerRequest, Container> assignedContainers = null;
 
     synchronized (this) {
@@ -1170,7 +1171,7 @@ public class YarnTaskSchedulerService extends TaskScheduler
             CookieContainerRequest request = entry.getValue();
             if (request.getPriority().equals(lowestPriNewContainer.getPriority())) {
               LOG.info("Resending request for task again: " + task);
-              deallocateTask(task, true, null);
+              deallocateTask(task, true, null, null);
               allocateTask(task, request.getCapability(), 
                   (request.getNodes() == null ? null : 
                     request.getNodes().toArray(new String[request.getNodes().size()])), 

http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index aeacf84..99cec2b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -631,14 +631,14 @@ public class AMContainerImpl implements AMContainer {
       SingleArcTransition<AMContainerImpl, AMContainerEvent> {
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      AMContainerEventLaunchFailed event = (AMContainerEventLaunchFailed) cEvent;
       if (container.currentAttempt != null) {
-        AMContainerEventLaunchFailed event = (AMContainerEventLaunchFailed) cEvent;
         // for a properly setup cluster this should almost always be an app error
         // need to differentiate between launch failed due to framework/cluster or app
         container.sendTerminatingToTaskAttempt(container.currentAttempt,
             event.getMessage(), TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED);
       }
-      container.unregisterFromTAListener(ContainerEndReason.LAUNCH_FAILED);
+      container.unregisterFromTAListener(ContainerEndReason.LAUNCH_FAILED, event.getMessage());
       container.deAllocate();
     }
   }
@@ -668,7 +668,7 @@ public class AMContainerImpl implements AMContainer {
       }
       container.containerLocalResources = null;
       container.additionalLocalResources = null;
-      container.unregisterFromTAListener(event.getContainerEndReason());
+      container.unregisterFromTAListener(event.getContainerEndReason(), event.getDiagnostics());
       String diag = event.getDiagnostics();
       if (!(diag == null || diag.equals(""))) {
         LOG.info("Container " + container.getContainerId()
@@ -694,7 +694,7 @@ public class AMContainerImpl implements AMContainer {
         container.sendTerminatingToTaskAttempt(container.currentAttempt,
             getMessage(container, cEvent), TaskAttemptTerminationCause.CONTAINER_STOPPED);
       }
-      container.unregisterFromTAListener(ContainerEndReason.OTHER);
+      container.unregisterFromTAListener(ContainerEndReason.OTHER, getMessage(container, cEvent));
       container.logStopped(container.currentAttempt == null ?
           ContainerExitStatus.SUCCESS 
           : ContainerExitStatus.INVALID);
@@ -746,7 +746,11 @@ public class AMContainerImpl implements AMContainer {
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
       super.transition(container, cEvent);
-      container.unregisterFromTAListener(ContainerEndReason.NODE_FAILED);
+      String errorMessage = "Node " + container.getContainer().getNodeId() + " failed. ";
+      if (cEvent instanceof DiagnosableEvent) {
+        errorMessage += ((DiagnosableEvent) cEvent).getDiagnosticInfo();
+      }
+      container.unregisterFromTAListener(ContainerEndReason.NODE_FAILED, errorMessage);
       container.deAllocate();
     }
   }
@@ -756,14 +760,15 @@ public class AMContainerImpl implements AMContainer {
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
       super.transition(container, cEvent);
+      String errorMessage = "Container " + container.getContainerId() +
+          " hit an invalid transition - " + cEvent.getType() + " at " +
+          container.getState();
       if (container.currentAttempt != null) {
         container.sendTerminatingToTaskAttempt(container.currentAttempt,
-            "Container " + container.getContainerId() +
-                " hit an invalid transition - " + cEvent.getType() + " at " +
-                container.getState(), TaskAttemptTerminationCause.FRAMEWORK_ERROR);
+            errorMessage, TaskAttemptTerminationCause.FRAMEWORK_ERROR);
       }
       container.logStopped(ContainerExitStatus.ABORTED);
-      container.unregisterFromTAListener(ContainerEndReason.FRAMEWORK_ERROR);
+      container.unregisterFromTAListener(ContainerEndReason.FRAMEWORK_ERROR, errorMessage);
       container.sendStopRequestToNM();
     }
   }
@@ -835,7 +840,12 @@ public class AMContainerImpl implements AMContainer {
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
 
       AMContainerEventAssignTA event = (AMContainerEventAssignTA) cEvent;
-      container.unregisterAttemptFromListener(container.currentAttempt, TaskAttemptEndReason.FRAMEWORK_ERROR);
+      String errorMessage = "AMScheduler Error: Multiple simultaneous " +
+          "taskAttempt allocations to: " + container.getContainerId() +
+          ". Attempts: " + container.getCurrentTaskAttempt() + ", " + event.getTaskAttemptId() +
+          ". Current state: " + container.getState();
+      container.unregisterAttemptFromListener(container.currentAttempt,
+          TaskAttemptEndReason.FRAMEWORK_ERROR, errorMessage);
       container.handleExtraTAAssign(event, container.currentAttempt);
     }
   }
@@ -846,7 +856,7 @@ public class AMContainerImpl implements AMContainer {
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
       container.lastTaskFinishTime = System.currentTimeMillis();
       container.completedAttempts.add(container.currentAttempt);
-      container.unregisterAttemptFromListener(container.currentAttempt, TaskAttemptEndReason.OTHER);
+      container.unregisterAttemptFromListener(container.currentAttempt, TaskAttemptEndReason.OTHER, null);
       container.currentAttempt = null;
     }
   }
@@ -863,7 +873,9 @@ public class AMContainerImpl implements AMContainer {
         container.sendTerminatedToTaskAttempt(container.currentAttempt,
             getMessage(container, event), event.getTerminationCause());
       }
-      container.unregisterAttemptFromListener(container.currentAttempt, TezUtilsInternal.toTaskAttemptEndReason(event.getTerminationCause()));
+      container.unregisterAttemptFromListener(container.currentAttempt,
+          TezUtilsInternal.toTaskAttemptEndReason(event.getTerminationCause()),
+          getMessage(container, event));
       container.registerFailedAttempt(container.currentAttempt);
       container.currentAttempt= null;
       super.transition(container, cEvent);
@@ -873,7 +885,8 @@ public class AMContainerImpl implements AMContainer {
   protected static class StopRequestAtRunningTransition
       extends StopRequestAtIdleTransition {
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
-      container.unregisterAttemptFromListener(container.currentAttempt, TaskAttemptEndReason.OTHER);
+      container.unregisterAttemptFromListener(container.currentAttempt, TaskAttemptEndReason.OTHER,
+          getMessage(container, cEvent));
       super.transition(container, cEvent);
     }
   }
@@ -894,7 +907,8 @@ public class AMContainerImpl implements AMContainer {
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
       super.transition(container, cEvent);
-      container.unregisterAttemptFromListener(container.currentAttempt, TaskAttemptEndReason.NODE_FAILED);
+      String errorMessage = "Node " + container.getContainer().getNodeId() + " failed. ";
+      container.unregisterAttemptFromListener(container.currentAttempt, TaskAttemptEndReason.NODE_FAILED, errorMessage);
     }
   }
 
@@ -903,11 +917,13 @@ public class AMContainerImpl implements AMContainer {
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
       super.transition(container, cEvent);
-      container.unregisterAttemptFromListener(container.currentAttempt, TaskAttemptEndReason.FRAMEWORK_ERROR);
+      String errorMessage = "Container " + container.getContainerId() +
+          " hit an invalid transition - " + cEvent.getType() + " at " +
+          container.getState();
+      container.unregisterAttemptFromListener(container.currentAttempt,
+          TaskAttemptEndReason.FRAMEWORK_ERROR, errorMessage);
       container.sendTerminatingToTaskAttempt(container.currentAttempt,
-          "Container " + container.getContainerId() +
-              " hit an invalid transition - " + cEvent.getType() + " at " +
-              container.getState(), TaskAttemptTerminationCause.FRAMEWORK_ERROR);
+          errorMessage, TaskAttemptTerminationCause.FRAMEWORK_ERROR);
     }
   }
 
@@ -1029,7 +1045,7 @@ public class AMContainerImpl implements AMContainer {
     LOG.warn(errorMessage);
     this.logStopped(ContainerExitStatus.INVALID);
     this.sendStopRequestToNM();
-    this.unregisterFromTAListener(ContainerEndReason.FRAMEWORK_ERROR);
+    this.unregisterFromTAListener(ContainerEndReason.FRAMEWORK_ERROR, errorMessage);
     this.unregisterFromContainerListener();
   }
 
@@ -1087,8 +1103,8 @@ public class AMContainerImpl implements AMContainer {
         container.getNodeId(), container.getContainerToken(), launcherId, schedulerId, taskCommId));
   }
 
-  protected void unregisterAttemptFromListener(TezTaskAttemptID attemptId, TaskAttemptEndReason endReason) {
-    taskAttemptListener.unregisterTaskAttempt(attemptId, taskCommId, endReason);
+  protected void unregisterAttemptFromListener(TezTaskAttemptID attemptId, TaskAttemptEndReason endReason, String diagnostics) {
+    taskAttemptListener.unregisterTaskAttempt(attemptId, taskCommId, endReason, diagnostics);
   }
 
   protected void registerAttemptWithListener(AMContainerTask amContainerTask) {
@@ -1099,8 +1115,8 @@ public class AMContainerImpl implements AMContainer {
     taskAttemptListener.registerRunningContainer(containerId, taskCommId);
   }
 
-  protected void unregisterFromTAListener(ContainerEndReason endReason) {
-    this.taskAttemptListener.unregisterRunningContainer(containerId, taskCommId, endReason);
+  protected void unregisterFromTAListener(ContainerEndReason endReason, String diagnostics) {
+    this.taskAttemptListener.unregisterRunningContainer(containerId, taskCommId, endReason, diagnostics);
   }
 
   protected void registerWithContainerListener() {

http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
index 0668ff2..32e515b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
@@ -53,7 +53,7 @@ public class AMNodeTracker extends AbstractService implements
 
   @SuppressWarnings("rawtypes")
   public AMNodeTracker(EventHandler eventHandler, AppContext appContext) {
-    super("AMNodeMap");
+    super("AMNodeTracker");
     this.perSourceNodeTrackers = new ConcurrentHashMap<>();
     this.eventHandler = eventHandler;
     this.appContext = appContext;

http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
index 4d404b9..5159aff 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
@@ -184,12 +184,12 @@ public class TestTaskAttemptListenerImplTezDag {
     assertEquals(taskSpec, containerTask.getTaskSpec());
 
     // Task unregistered. Should respond to heartbeats
-    taskAttemptListener.unregisterTaskAttempt(taskAttemptID, 0, TaskAttemptEndReason.OTHER);
+    taskAttemptListener.unregisterTaskAttempt(taskAttemptID, 0, TaskAttemptEndReason.OTHER, null);
     containerTask = tezUmbilical.getTask(containerContext2);
     assertNull(containerTask);
 
     // Container unregistered. Should send a shouldDie = true
-    taskAttemptListener.unregisterRunningContainer(containerId2, 0, ContainerEndReason.OTHER);
+    taskAttemptListener.unregisterRunningContainer(containerId2, 0, ContainerEndReason.OTHER, null);
     containerTask = tezUmbilical.getTask(containerContext2);
     assertTrue(containerTask.shouldDie());
 
@@ -203,7 +203,7 @@ public class TestTaskAttemptListenerImplTezDag {
     doReturn(taskAttemptId2).when(taskSpec2).getTaskAttemptID();
     AMContainerTask amContainerTask2 = new AMContainerTask(taskSpec, null, null, false, 0);
     taskAttemptListener.registerTaskAttempt(amContainerTask2, containerId3, 0);
-    taskAttemptListener.unregisterRunningContainer(containerId3, 0, ContainerEndReason.OTHER);
+    taskAttemptListener.unregisterRunningContainer(containerId3, 0, ContainerEndReason.OTHER, null);
     containerTask = tezUmbilical.getTask(containerContext3);
     assertTrue(containerTask.shouldDie());
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java
index abb5e42..74468f2 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java
@@ -41,7 +41,6 @@ import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.NamedEntityDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
-import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.apache.tez.dag.app.dag.DAG;
@@ -110,7 +109,7 @@ public class TestTaskAttemptListenerImplTezDag2 {
     taskAttemptListener
         .taskFailed(taskAttemptId1, TaskAttemptEndReason.COMMUNICATION_ERROR, "Diagnostics1");
     taskAttemptListener
-        .taskKilled(taskAttemptId2, TaskAttemptEndReason.SERVICE_BUSY, "Diagnostics2");
+        .taskKilled(taskAttemptId2, TaskAttemptEndReason.EXECUTOR_BUSY, "Diagnostics2");
 
     ArgumentCaptor<Event> argumentCaptor = ArgumentCaptor.forClass(Event.class);
     verify(eventHandler, times(2)).handle(argumentCaptor.capture());

http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorContextImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorContextImpl.java
new file mode 100644
index 0000000..1545eb4
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorContextImpl.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verify;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.common.ContainerSignatureMatcher;
+import org.apache.tez.dag.api.TaskCommunicatorContext;
+import org.apache.tez.dag.app.rm.container.AMContainerMap;
+import org.junit.Test;
+
+public class TestTaskCommunicatorContextImpl {
+
+  @Test(timeout = 5000)
+  public void testIsKnownContainer() {
+    AppContext appContext = mock(AppContext.class);
+    TaskAttemptListenerImpTezDag tal = mock(TaskAttemptListenerImpTezDag.class);
+
+    AMContainerMap amContainerMap = new AMContainerMap(mock(ContainerHeartbeatHandler.class), tal, mock(
+        ContainerSignatureMatcher.class), appContext);
+
+    doReturn(amContainerMap).when(appContext).getAllContainers();
+
+    ContainerId containerId01 = mock(ContainerId.class);
+    Container container01 = mock(Container.class);
+    doReturn(containerId01).when(container01).getId();
+
+    ContainerId containerId11 = mock(ContainerId.class);
+    Container container11 = mock(Container.class);
+    doReturn(containerId11).when(container11).getId();
+
+    amContainerMap.addContainerIfNew(container01, 0, 0, 0);
+    amContainerMap.addContainerIfNew(container11, 1, 1, 1);
+
+    TaskCommunicatorContext taskCommContext0 = new TaskCommunicatorContextImpl(appContext, tal, null, 0);
+    TaskCommunicatorContext taskCommContext1 = new TaskCommunicatorContextImpl(appContext, tal, null, 1);
+
+    assertTrue(taskCommContext0.isKnownContainer(containerId01));
+    assertFalse(taskCommContext0.isKnownContainer(containerId11));
+
+    assertFalse(taskCommContext1.isKnownContainer(containerId01));
+    assertTrue(taskCommContext1.isKnownContainer(containerId11));
+
+    taskCommContext0.containerAlive(containerId01);
+    verify(tal).containerAlive(containerId01);
+    reset(tal);
+
+    taskCommContext0.containerAlive(containerId11);
+    verify(tal, never()).containerAlive(containerId11);
+    reset(tal);
+
+    taskCommContext1.containerAlive(containerId01);
+    verify(tal, never()).containerAlive(containerId01);
+    reset(tal);
+
+    taskCommContext1.containerAlive(containerId11);
+    verify(tal).containerAlive(containerId11);
+    reset(tal);
+
+    taskCommContext1.containerAlive(containerId01);
+    verify(tal, never()).containerAlive(containerId01);
+    reset(tal);
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java
index c76aa50..4f68fab 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java
@@ -328,7 +328,7 @@ public class TestTaskCommunicatorManager {
     }
 
     @Override
-    public void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason) {
+    public void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason, String diagnostics) {
 
     }
 
@@ -342,7 +342,7 @@ public class TestTaskCommunicatorManager {
 
     @Override
     public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID,
-                                             TaskAttemptEndReason endReason) {
+                                             TaskAttemptEndReason endReason, String diagnostics) {
 
     }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 2bf1c85..947ea93 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -308,6 +308,7 @@ public class TestTaskAttempt {
         resource, createFakeContainerContext(), false);
 
     NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+    @SuppressWarnings("deprecation")
     ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
     Container container = mock(Container.class);
     when(container.getId()).thenReturn(contId);
@@ -351,6 +352,7 @@ public class TestTaskAttempt {
     Resource resource = Resource.newInstance(1024, 1);
 
     NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+    @SuppressWarnings("deprecation")
     ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
     Container container = mock(Container.class);
     when(container.getId()).thenReturn(contId);
@@ -453,6 +455,7 @@ public class TestTaskAttempt {
     Resource resource = Resource.newInstance(1024, 1);
 
     NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+    @SuppressWarnings("deprecation")
     ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
     Container container = mock(Container.class);
     when(container.getId()).thenReturn(contId);
@@ -519,6 +522,7 @@ public class TestTaskAttempt {
     Resource resource = Resource.newInstance(1024, 1);
 
     NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+    @SuppressWarnings("deprecation")
     ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
     Container container = mock(Container.class);
     when(container.getId()).thenReturn(contId);
@@ -613,6 +617,7 @@ public class TestTaskAttempt {
     Resource resource = Resource.newInstance(1024, 1);
 
     NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+    @SuppressWarnings("deprecation")
     ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
     Container container = mock(Container.class);
     when(container.getId()).thenReturn(contId);
@@ -745,6 +750,7 @@ public class TestTaskAttempt {
     Resource resource = Resource.newInstance(1024, 1);
 
     NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+    @SuppressWarnings("deprecation")
     ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
     Container container = mock(Container.class);
     when(container.getId()).thenReturn(contId);
@@ -837,6 +843,7 @@ public class TestTaskAttempt {
     Resource resource = Resource.newInstance(1024, 1);
 
     NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+    @SuppressWarnings("deprecation")
     ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
     Container container = mock(Container.class);
     when(container.getId()).thenReturn(contId);
@@ -933,6 +940,7 @@ public class TestTaskAttempt {
     Resource resource = Resource.newInstance(1024, 1);
 
     NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+    @SuppressWarnings("deprecation")
     ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
     Container container = mock(Container.class);
     when(container.getId()).thenReturn(contId);
@@ -1037,6 +1045,7 @@ public class TestTaskAttempt {
     Resource resource = Resource.newInstance(1024, 1);
 
     NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+    @SuppressWarnings("deprecation")
     ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
     Container container = mock(Container.class);
     when(container.getId()).thenReturn(contId);
@@ -1138,6 +1147,7 @@ public class TestTaskAttempt {
     Resource resource = Resource.newInstance(1024, 1);
 
     NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+    @SuppressWarnings("deprecation")
     ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
     Container container = mock(Container.class);
     when(container.getId()).thenReturn(contId);
@@ -1342,7 +1352,7 @@ public class TestTaskAttempt {
         }
       }
     }
-  };
+  }
 
   private class MockTaskAttemptImpl extends TaskAttemptImpl {
     

http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherRouter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherRouter.java b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherRouter.java
index 62a5f19..d0caf8c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherRouter.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherRouter.java
@@ -273,8 +273,7 @@ public class TestContainerLauncherRouter {
                                               TaskAttemptListener taskAttemptListener,
                                               String workingDirectory,
                                               int containerLauncherIndex,
-                                              boolean isPureLocalMode) throws
-        UnknownHostException {
+                                              boolean isPureLocalMode) {
       numContainerLaunchers.incrementAndGet();
       boolean added = containerLauncherIndices.add(containerLauncherIndex);
       assertTrue("Cannot add multiple launchers with the same index", added);
@@ -298,8 +297,7 @@ public class TestContainerLauncherRouter {
                                                   AppContext context,
                                                   TaskAttemptListener taskAttemptListener,
                                                   String workingDirectory,
-                                                  boolean isPureLocalMode) throws
-        UnknownHostException {
+                                                  boolean isPureLocalMode) {
       uberContainerLauncherCreated.set(true);
       testContainerLaunchers.add(uberContainerlauncher);
       return uberContainerlauncher;