You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/22 03:19:16 UTC

[26/50] [abbrv] tez git commit: TEZ-2004. Define basic interface for pluggable ContainerLaunchers. (sseth)

TEZ-2004. Define basic interface for pluggable ContainerLaunchers. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/eec648d3
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/eec648d3
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/eec648d3

Branch: refs/heads/TEZ-2003
Commit: eec648d3af68ef1b4967e5c7cfc464a476f308c6
Parents: 3a143a6
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Jul 20 15:52:24 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Aug 21 18:13:56 2015 -0700

----------------------------------------------------------------------
 TEZ-2003-CHANGES.txt                            |   1 +
 .../serviceplugins/api/ContainerEndReason.java  |  31 +++++
 .../api/ContainerLaunchRequest.java             |  81 +++++++++++
 .../serviceplugins/api/ContainerLauncher.java   |  46 +++++++
 .../api/ContainerLauncherContext.java           |  54 ++++++++
 .../api/ContainerLauncherOperationBase.java     |  58 ++++++++
 .../api/ContainerStopRequest.java               |  47 +++++++
 .../api/TaskAttemptEndReason.java               |  32 +++++
 .../org/apache/tez/common/TezUtilsInternal.java |   9 +-
 .../apache/tez/dag/api/ContainerEndReason.java  |  27 ----
 .../tez/dag/api/TaskAttemptEndReason.java       |  27 ----
 .../apache/tez/dag/api/TaskCommunicator.java    |   9 ++
 .../tez/dag/api/TaskCommunicatorContext.java    |   2 +-
 .../tez/dag/api/TaskCommunicatorInterface.java  |  18 +++
 .../java/org/apache/tez/dag/app/AppContext.java |   5 +
 .../dag/app/ContainerLauncherContextImpl.java   | 101 ++++++++++++++
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  15 +++
 .../apache/tez/dag/app/TaskAttemptListener.java |   6 +-
 .../dag/app/TaskAttemptListenerImpTezDag.java   |   4 +-
 .../dag/app/TaskCommunicatorContextImpl.java    |   6 +-
 .../tez/dag/app/TezTaskCommunicatorImpl.java    |  12 +-
 .../tez/dag/app/launcher/ContainerLauncher.java |  29 ----
 .../dag/app/launcher/ContainerLauncherImpl.java | 128 ++++++++----------
 .../app/launcher/ContainerLauncherRouter.java   |  52 +++++++-
 .../tez/dag/app/launcher/ContainerOp.java       |  62 +++++++++
 .../app/launcher/LocalContainerLauncher.java    | 123 ++++++++---------
 .../tez/dag/app/rm/AMSchedulerEventTAEnded.java |   2 +-
 .../dag/app/rm/LocalTaskSchedulerService.java   |   2 +-
 .../tez/dag/app/rm/NMCommunicatorEvent.java     |  18 ++-
 .../rm/NMCommunicatorLaunchRequestEvent.java    |  11 +-
 .../app/rm/NMCommunicatorStopRequestEvent.java  |   4 +-
 .../tez/dag/app/rm/TaskSchedulerService.java    |   2 +-
 .../dag/app/rm/YarnTaskSchedulerService.java    |   2 +-
 .../rm/container/AMContainerEventCompleted.java |   2 +-
 .../dag/app/rm/container/AMContainerImpl.java   |   9 +-
 .../apache/tez/dag/app/MockDAGAppMaster.java    |  63 ++++-----
 .../app/TestTaskAttemptListenerImplTezDag.java  |   4 +-
 .../app/TestTaskAttemptListenerImplTezDag2.java |   2 +-
 .../tez/dag/app/rm/TestContainerReuse.java      |   4 +-
 .../app/rm/TestLocalTaskSchedulerService.java   |   1 -
 .../dag/app/rm/container/TestAMContainer.java   |   4 +-
 .../TezTestServiceContainerLauncher.java        | 133 +++++++++----------
 .../TezTestServiceNoOpContainerLauncher.java    |  53 +++-----
 .../rm/TezTestServiceTaskSchedulerService.java  |   2 +-
 .../TezTestServiceTaskCommunicatorImpl.java     |   4 +-
 45 files changed, 887 insertions(+), 420 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/eec648d3/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index 604947c..88dd0c7 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -34,5 +34,6 @@ ALL CHANGES:
   TEZ-2526. Fix version for tez-history-parser.
   TEZ-2621. rebase 07/14
   TEZ-2124. Change Node tracking to work per external container source.
+  TEZ-2004. Define basic interface for pluggable ContainerLaunchers.
 
 INCOMPATIBLE CHANGES:

http://git-wip-us.apache.org/repos/asf/tez/blob/eec648d3/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerEndReason.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerEndReason.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerEndReason.java
new file mode 100644
index 0000000..ab8619f
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerEndReason.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.serviceplugins.api;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public enum ContainerEndReason {
+  NODE_FAILED, // Completed because the node running the container was marked as dead
+  INTERNAL_PREEMPTION, // Preempted by the AM, due to an internal decision
+  EXTERNAL_PREEMPTION, // Preempted due to cluster contention
+  APPLICATION_ERROR, // An error in the AM caused by user code
+  FRAMEWORK_ERROR, // An error in the AM - likely a bug.
+  LAUNCH_FAILED, // Failure to launch the container
+  COMPLETED, // Completed via normal flow
+  OTHER
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/eec648d3/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLaunchRequest.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLaunchRequest.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLaunchRequest.java
new file mode 100644
index 0000000..cfd7ca7
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLaunchRequest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.serviceplugins.api;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Token;
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class ContainerLaunchRequest extends ContainerLauncherOperationBase {
+
+  private final ContainerLaunchContext clc;
+  private final Container container;
+  private final String schedulerName;
+  private final String taskCommName;
+
+  public ContainerLaunchRequest(NodeId nodeId,
+                                ContainerId containerId,
+                                Token containerToken,
+                                ContainerLaunchContext clc,
+                                Container container, String schedulerName, String taskCommName) {
+    super(nodeId, containerId, containerToken);
+    this.clc = clc;
+    this.container = container;
+    this.schedulerName = schedulerName;
+    this.taskCommName = taskCommName;
+  }
+
+
+  // TODO Post TEZ-2003. TEZ-2625. ContainerLaunchContext needs to be built here instead of being passed in.
+  // Basic specifications need to be provided here
+  public ContainerLaunchContext getContainerLaunchContext() {
+    return clc;
+  }
+
+  /**
+   * Get the name of the task communicator which will be used to communicate
+   * with the task that will run in this container.
+   * @return
+   */
+  public String getTaskCommunicatorName() {
+    return taskCommName;
+  }
+
+  /**
+   * Get the name of the scheduler which allocated this container.
+   * @return
+   */
+  public String getSchedulerName() {
+    return schedulerName;
+  }
+
+  @Override
+  public String toString() {
+    return "ContainerLaunchRequest{" +
+        "nodeId=" + getNodeId() +
+        ", containerId=" + getContainerId() +
+        ", clc=" + clc +
+        ", container=" + container +
+        ", schedulerName='" + schedulerName + '\'' +
+        ", taskCommName='" + taskCommName + '\'' +
+        '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/eec648d3/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java
new file mode 100644
index 0000000..218edb6
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.serviceplugins.api;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.service.AbstractService;
+
+/**
+ * Plugin to allow custom container launchers to be written to launch containers on different types
+ * of executors.
+ */
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public abstract class ContainerLauncher extends AbstractService {
+
+  private final ContainerLauncherContext containerLauncherContext;
+
+  // TODO TEZ-2003 Simplify this by moving away from AbstractService. Potentially Guava AbstractService.
+  // A serviceInit(Configuration) is not likely to be very useful, and will expose unnecessary internal
+  // configuration to the services if populated with the AM Configuration
+  public ContainerLauncher(String name, ContainerLauncherContext containerLauncherContext) {
+    super(name);
+    this.containerLauncherContext = containerLauncherContext;
+  }
+
+  public final ContainerLauncherContext getContext() {
+    return this.containerLauncherContext;
+  }
+
+  public abstract void launchContainer(ContainerLaunchRequest launchRequest);
+  public abstract void stopContainer(ContainerStopRequest stopRequest);
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/eec648d3/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java
new file mode 100644
index 0000000..836dc4a
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.serviceplugins.api;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public interface ContainerLauncherContext {
+
+  // TODO Post TEZ-2003. Tez abstraction for ContainerId, NodeId, other YARN constructs
+
+  // Reporting APIs
+  void containerLaunched(ContainerId containerId);
+
+  void containerLaunchFailed(ContainerId containerId, String diagnostics);
+
+  void containerStopRequested(ContainerId containerId);
+
+  void containerStopFailed(ContainerId containerId, String diagnostics);
+
+  // TODO Post TEZ-2003. TaskAttemptEndReason does not belong here, and is an unnecessary leak.
+  // ContainerCompleted is normally generated by the scheduler in case of YARN since the RM informs about completion.
+  // For other sources, there may not be a central entity making this information available. The ContainerLauncher
+  // on the stop request will likely be the best place to generate it.
+  void containerCompleted(ContainerId containerId, int exitStatus, String diagnostics, TaskAttemptEndReason endReason);
+
+  // Lookup APIs
+
+  // TODO TEZ-2003. To be replaced by getInitialPayload once the DAG API is changed.
+  Configuration getInitialConfiguration();
+
+  int getNumNodes(String sourceName);
+
+  ApplicationAttemptId getApplicationAttemptId();
+
+  Object getTaskCommunicatorMetaInfo(String taskCommName);
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/eec648d3/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherOperationBase.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherOperationBase.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherOperationBase.java
new file mode 100644
index 0000000..29e0420
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherOperationBase.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.serviceplugins.api;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Token;
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class ContainerLauncherOperationBase {
+
+  private final NodeId nodeId;
+  private final ContainerId containerId;
+  private final Token containerToken;
+
+  public ContainerLauncherOperationBase(NodeId nodeId,
+                                        ContainerId containerId,
+                                        Token containerToken) {
+    this.nodeId = nodeId;
+    this.containerId = containerId;
+    this.containerToken = containerToken;
+  }
+
+  public NodeId getNodeId() {
+    return nodeId;
+  }
+
+  public ContainerId getContainerId() {
+    return containerId;
+  }
+
+  public Token getContainerToken() {
+    return containerToken;
+  }
+
+  @Override
+  public String toString() {
+    return "ContainerLauncherOperationBase{" +
+        "nodeId=" + nodeId +
+        ", containerId=" + containerId +
+        '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/eec648d3/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerStopRequest.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerStopRequest.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerStopRequest.java
new file mode 100644
index 0000000..cb0af31
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerStopRequest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.serviceplugins.api;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Token;
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class ContainerStopRequest extends ContainerLauncherOperationBase {
+
+  private final String schedulerName;
+  private final String taskCommName;
+
+  public ContainerStopRequest(NodeId nodeId,
+                              ContainerId containerId,
+                              Token containerToken, String schedulerName, String taskCommName) {
+    super(nodeId, containerId, containerToken);
+    this.schedulerName = schedulerName;
+    this.taskCommName = taskCommName;
+  }
+
+  @Override
+  public String toString() {
+    return "ContainerStopRequest{" +
+        "nodeId=" + getNodeId() +
+        ", containerId=" + getContainerId() +
+        ", schedulerName='" + schedulerName + '\'' +
+        ", taskCommName='" + taskCommName + '\'' +
+        '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/eec648d3/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskAttemptEndReason.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskAttemptEndReason.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskAttemptEndReason.java
new file mode 100644
index 0000000..4255c28
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskAttemptEndReason.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.serviceplugins.api;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public enum TaskAttemptEndReason {
+  NODE_FAILED, // Completed because the node running the container was marked as dead
+  COMMUNICATION_ERROR, // Communication error with the task
+  SERVICE_BUSY, // External service busy
+  INTERNAL_PREEMPTION, // Preempted by the AM, due to an internal decision
+  EXTERNAL_PREEMPTION, // Preempted due to cluster contention
+  APPLICATION_ERROR, // An error in the AM caused by user code
+  FRAMEWORK_ERROR, // An error in the AM - likely a bug.
+  CONTAINER_EXITED,
+  OTHER // Unknown reason
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/eec648d3/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
index 0bdeb79..4c8c227 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
@@ -35,13 +35,12 @@ import java.util.zip.Inflater;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.Descriptors;
 import com.google.protobuf.TextFormat;
-import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.log4j.Appender;
 import org.apache.tez.dag.api.DagTypeConverters;
-import org.apache.tez.dag.api.TaskAttemptEndReason;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.records.DAGProtos;
 import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
@@ -256,6 +255,8 @@ public class TezUtilsInternal {
         return TaskAttemptTerminationCause.FRAMEWORK_ERROR;
       case NODE_FAILED:
         return TaskAttemptTerminationCause.NODE_FAILED;
+      case CONTAINER_EXITED:
+        return TaskAttemptTerminationCause.CONTAINER_EXITED;
       case OTHER:
         return TaskAttemptTerminationCause.UNKNOWN_ERROR;
       default:
@@ -283,6 +284,8 @@ public class TezUtilsInternal {
         return TaskAttemptEndReason.FRAMEWORK_ERROR;
       case NODE_FAILED:
         return TaskAttemptEndReason.NODE_FAILED;
+      case CONTAINER_EXITED:
+        return TaskAttemptEndReason.CONTAINER_EXITED;
       case INTERRUPTED_BY_SYSTEM:
       case INTERRUPTED_BY_USER:
       case UNKNOWN_ERROR:
@@ -296,7 +299,7 @@ public class TezUtilsInternal {
       case OUTPUT_LOST:
       case TASK_HEARTBEAT_ERROR:
       case CONTAINER_LAUNCH_FAILED:
-      case CONTAINER_EXITED:
+
       case CONTAINER_STOPPED:
       case NODE_DISK_ERROR:
       default:

http://git-wip-us.apache.org/repos/asf/tez/blob/eec648d3/tez-common/src/main/java/org/apache/tez/dag/api/ContainerEndReason.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/api/ContainerEndReason.java b/tez-common/src/main/java/org/apache/tez/dag/api/ContainerEndReason.java
deleted file mode 100644
index e13e886..0000000
--- a/tez-common/src/main/java/org/apache/tez/dag/api/ContainerEndReason.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.api;
-
-// TODO TEZ-2003 Expose as a public API
-public enum ContainerEndReason {
-  NODE_FAILED, // Completed because the node running the container was marked as dead
-  INTERNAL_PREEMPTION, // Preempted by the AM, due to an internal decision
-  EXTERNAL_PREEMPTION, // Preempted due to cluster contention
-  APPLICATION_ERROR, // An error in the AM caused by user code
-  FRAMEWORK_ERROR, // An error in the AM - likely a bug.
-  LAUNCH_FAILED, // Failure to launch the container
-  COMPLETED, // Completed via normal flow
-  OTHER
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/eec648d3/tez-common/src/main/java/org/apache/tez/dag/api/TaskAttemptEndReason.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/api/TaskAttemptEndReason.java b/tez-common/src/main/java/org/apache/tez/dag/api/TaskAttemptEndReason.java
deleted file mode 100644
index de78d21..0000000
--- a/tez-common/src/main/java/org/apache/tez/dag/api/TaskAttemptEndReason.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.api;
-
-// TODO TEZ-2003 Expose as a public API
-public enum TaskAttemptEndReason {
-  NODE_FAILED, // Completed because the node running the container was marked as dead
-  COMMUNICATION_ERROR, // Communication error with the task
-  SERVICE_BUSY, // External service busy
-  INTERNAL_PREEMPTION, // Preempted by the AM, due to an internal decision
-  EXTERNAL_PREEMPTION, // Preempted due to cluster contention
-  APPLICATION_ERROR, // An error in the AM caused by user code
-  FRAMEWORK_ERROR, // An error in the AM - likely a bug.
-  OTHER // Unknown reason
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/eec648d3/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
index d0a006b..05e437c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
@@ -22,6 +22,8 @@ import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.serviceplugins.api.ContainerEndReason;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.api.impl.TaskSpec;
 
@@ -87,4 +89,11 @@ public abstract class TaskCommunicator extends AbstractService {
   // TODO TEZ-2003 This is extremely difficult to use. Add the dagStarted notification, and potentially
   // throw exceptions between a dagComplete and dagStart invocation.
   public abstract void dagComplete(String dagName);
+
+  /**
+   * Share meta-information such as host:port information where the Task Communicator may be listening.
+   * Primarily for use by compatible launchers to learn this information.
+   * @return
+   */
+  public abstract Object getMetaInfo();
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/eec648d3/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
index 56345ab..b6e63f7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
@@ -16,13 +16,13 @@ package org.apache.tez.dag.api;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
-import java.util.Collection;
 import java.util.Set;
 
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 
 

http://git-wip-us.apache.org/repos/asf/tez/blob/eec648d3/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorInterface.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorInterface.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorInterface.java
new file mode 100644
index 0000000..022cd7b
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorInterface.java
@@ -0,0 +1,18 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+public interface TaskCommunicatorInterface {
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/eec648d3/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
index 1ccb10b..516fcef 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
@@ -118,4 +118,9 @@ public interface AppContext {
   public Integer getTaskCommunicatorIdentifier(String name);
   public Integer getTaskScheduerIdentifier(String name);
   public Integer getContainerLauncherIdentifier(String name);
+
+  public String getTaskCommunicatorName(int taskCommId);
+  public String getTaskSchedulerName(int schedulerId);
+  public String getContainerLauncherName(int launcherId);
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/eec648d3/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java
new file mode 100644
index 0000000..997775a
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
+import org.apache.tez.dag.app.rm.container.AMContainerEvent;
+import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted;
+import org.apache.tez.dag.app.rm.container.AMContainerEventLaunchFailed;
+import org.apache.tez.dag.app.rm.container.AMContainerEventLaunched;
+import org.apache.tez.dag.app.rm.container.AMContainerEventStopFailed;
+import org.apache.tez.dag.app.rm.container.AMContainerEventType;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+
+public class ContainerLauncherContextImpl implements ContainerLauncherContext {
+
+  private final AppContext context;
+  private final TaskAttemptListener tal;
+
+  public ContainerLauncherContextImpl(AppContext appContext, TaskAttemptListener tal) {
+    this.context = appContext;
+    this.tal = tal;
+  }
+
+  @Override
+  public void containerLaunched(ContainerId containerId) {
+    context.getEventHandler().handle(
+        new AMContainerEventLaunched(containerId));
+    ContainerLaunchedEvent lEvt = new ContainerLaunchedEvent(
+        containerId, context.getClock().getTime(), context.getApplicationAttemptId());
+    context.getHistoryHandler().handle(new DAGHistoryEvent(
+        null, lEvt));
+
+  }
+
+  @Override
+  public void containerLaunchFailed(ContainerId containerId, String diagnostics) {
+    context.getEventHandler().handle(new AMContainerEventLaunchFailed(containerId, diagnostics));
+  }
+
+  @Override
+  public void containerStopRequested(ContainerId containerId) {
+    context.getEventHandler().handle(
+        new AMContainerEvent(containerId, AMContainerEventType.C_NM_STOP_SENT));
+  }
+
+  @Override
+  public void containerStopFailed(ContainerId containerId, String diagnostics) {
+    context.getEventHandler().handle(
+        new AMContainerEventStopFailed(containerId, diagnostics));
+  }
+
+  @Override
+  public void containerCompleted(ContainerId containerId, int exitStatus, String diagnostics,
+                                 TaskAttemptEndReason endReason) {
+    context.getEventHandler().handle(new AMContainerEventCompleted(containerId, exitStatus, diagnostics, TezUtilsInternal
+        .fromTaskAttemptEndReason(
+            endReason)));
+  }
+
+  @Override
+  public Configuration getInitialConfiguration() {
+    return context.getAMConf();
+  }
+
+  @Override
+  public int getNumNodes(String sourceName) {
+    int sourceIndex = context.getTaskScheduerIdentifier(sourceName);
+    int numNodes = context.getNodeTracker().getNumNodes(sourceIndex);
+    return numNodes;
+  }
+
+  @Override
+  public ApplicationAttemptId getApplicationAttemptId() {
+    return context.getApplicationAttemptId();
+  }
+
+  @Override
+  public Object getTaskCommunicatorMetaInfo(String taskCommName) {
+    int taskCommId = context.getTaskCommunicatorIdentifier(taskCommName);
+    return tal.getTaskCommunicator(taskCommId).getMetaInfo();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/eec648d3/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 2dc5a07..04e2578 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -1551,6 +1551,21 @@ public class DAGAppMaster extends AbstractService {
     }
 
     @Override
+    public String getTaskCommunicatorName(int taskCommId) {
+      return taskCommunicators.inverse().get(taskCommId);
+    }
+
+    @Override
+    public String getTaskSchedulerName(int schedulerId) {
+      return taskSchedulers.inverse().get(schedulerId);
+    }
+
+    @Override
+    public String getContainerLauncherName(int launcherId) {
+      return containerLaunchers.inverse().get(launcherId);
+    }
+
+    @Override
     public Map<ApplicationAccessType, String> getApplicationACLs() {
       if (getServiceState() != STATE.STARTED) {
         throw new TezUncheckedException(

http://git-wip-us.apache.org/repos/asf/tez/blob/eec648d3/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
index 92e38ae..2eec2fb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
@@ -18,11 +18,9 @@
 
 package org.apache.tez.dag.app;
 
-import java.net.InetSocketAddress;
-
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.tez.dag.api.ContainerEndReason;
-import org.apache.tez.dag.api.TaskAttemptEndReason;
+import org.apache.tez.serviceplugins.api.ContainerEndReason;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.api.TaskCommunicator;
 import org.apache.tez.dag.app.rm.container.AMContainerTask;

http://git-wip-us.apache.org/repos/asf/tez/blob/eec648d3/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index e2d44e2..47b63dd 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -28,7 +28,7 @@ import java.util.concurrent.ConcurrentMap;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.collections4.ListUtils;
-import org.apache.tez.dag.api.ContainerEndReason;
+import org.apache.tez.serviceplugins.api.ContainerEndReason;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
 import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
@@ -43,7 +43,7 @@ import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.dag.api.TaskCommunicator;
 import org.apache.tez.dag.api.TaskCommunicatorContext;
-import org.apache.tez.dag.api.TaskAttemptEndReason;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.apache.tez.dag.api.TaskHeartbeatResponse;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezException;

http://git-wip-us.apache.org/repos/asf/tez/blob/eec648d3/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
index 790066f..50e006d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
@@ -17,10 +17,6 @@ package org.apache.tez.dag.app;
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import com.google.common.base.Function;
@@ -30,7 +26,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.tez.dag.api.TaskAttemptEndReason;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.apache.tez.dag.api.TaskCommunicatorContext;
 import org.apache.tez.dag.api.TaskHeartbeatRequest;
 import org.apache.tez.dag.api.TaskHeartbeatResponse;

http://git-wip-us.apache.org/repos/asf/tez/blob/eec648d3/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
index 83322f2..0374022 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
@@ -41,8 +41,8 @@ import org.apache.tez.common.ContainerContext;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.common.security.TokenCache;
-import org.apache.tez.dag.api.ContainerEndReason;
-import org.apache.tez.dag.api.TaskAttemptEndReason;
+import org.apache.tez.serviceplugins.api.ContainerEndReason;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.apache.tez.dag.api.TaskCommunicator;
 import org.apache.tez.dag.api.TaskCommunicatorContext;
 import org.apache.tez.dag.api.TaskHeartbeatRequest;
@@ -180,7 +180,8 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
 
   @Override
   public void registerRunningContainer(ContainerId containerId, String host, int port) {
-    ContainerInfo oldInfo = registeredContainers.putIfAbsent(containerId, new ContainerInfo(containerId, host, port));
+    ContainerInfo oldInfo = registeredContainers.putIfAbsent(containerId,
+        new ContainerInfo(containerId, host, port));
     if (oldInfo != null) {
       throw new TezUncheckedException("Multiple registrations for containerId: " + containerId);
     }
@@ -267,6 +268,11 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
     // Nothing to do at the moment. Some of the TODOs from TaskAttemptListener apply here.
   }
 
+  @Override
+  public Object getMetaInfo() {
+    return address;
+  }
+
   protected String getTokenIdentifier() {
     return tokenIdentifier;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/eec648d3/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java
deleted file mode 100644
index ea07a1d..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.launcher;
-
-
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tez.dag.app.dag.DAG;
-import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
-
-public interface ContainerLauncher
-    extends EventHandler<NMCommunicatorEvent> {
-
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/eec648d3/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
index a12fb04..fe0178c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
@@ -30,12 +30,15 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
+import org.apache.tez.serviceplugins.api.ContainerLauncher;
+import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
+import org.apache.tez.serviceplugins.api.ContainerStopRequest;
 import org.apache.tez.dag.api.TezConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
@@ -45,57 +48,43 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy;
 import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData;
-import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
-import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
-import org.apache.tez.dag.app.rm.container.AMContainerEvent;
-import org.apache.tez.dag.app.rm.container.AMContainerEventLaunchFailed;
-import org.apache.tez.dag.app.rm.container.AMContainerEventLaunched;
-import org.apache.tez.dag.app.rm.container.AMContainerEventStopFailed;
-import org.apache.tez.dag.app.rm.container.AMContainerEventType;
-import org.apache.tez.dag.history.DAGHistoryEvent;
-import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 
-// TODO XXX: See what part of this lifecycle and state management can be simplified.
+// TODO See what part of this lifecycle and state management can be simplified.
 // Ideally, no state - only sendStart / sendStop.
 
-// TODO XXX: Review this entire code and clean it up.
+// TODO Review this entire code and clean it up.
 
 /**
  * This class is responsible for launching of containers.
  */
-public class ContainerLauncherImpl extends AbstractService implements
-    ContainerLauncher {
+public class ContainerLauncherImpl extends ContainerLauncher {
 
-  // TODO XXX Ensure the same thread is used to launch / stop the same container. Or - ensure event ordering.
+  // TODO Ensure the same thread is used to launch / stop the same container. Or - ensure event ordering.
   static final Logger LOG = LoggerFactory.getLogger(ContainerLauncherImpl.class);
 
-  private ConcurrentHashMap<ContainerId, Container> containers =
-    new ConcurrentHashMap<ContainerId, Container>();
-  private AppContext context;
+  private final ConcurrentHashMap<ContainerId, Container> containers =
+    new ConcurrentHashMap<>();
   protected ThreadPoolExecutor launcherPool;
   protected static final int INITIAL_POOL_SIZE = 10;
-  private int limitOnPoolSize;
+  private final int limitOnPoolSize;
+  private final Configuration conf;
   private Thread eventHandlingThread;
-  protected BlockingQueue<NMCommunicatorEvent> eventQueue =
-      new LinkedBlockingQueue<NMCommunicatorEvent>();
-  private Clock clock;
+  protected BlockingQueue<ContainerOp> eventQueue = new LinkedBlockingQueue<>();
   private ContainerManagementProtocolProxy cmProxy;
   private AtomicBoolean serviceStopped = new AtomicBoolean(false);
 
-  private Container getContainer(NMCommunicatorEvent event) {
-    ContainerId id = event.getContainerId();
+  private Container getContainer(ContainerOp event) {
+    ContainerId id = event.getBaseOperation().getContainerId();
     Container c = containers.get(id);
     if(c == null) {
-      c = new Container(event.getContainerId(),
-          event.getNodeId().toString(), event.getContainerToken());
+      c = new Container(id,
+          event.getBaseOperation().getNodeId().toString(), event.getBaseOperation().getContainerToken());
       Container old = containers.putIfAbsent(id, c);
       if(old != null) {
         c = old;
@@ -111,6 +100,7 @@ public class ContainerLauncherImpl extends AbstractService implements
     }
   }
 
+
   private static enum ContainerState {
     PREP, FAILED, RUNNING, DONE, KILLED_BEFORE_LAUNCH
   }
@@ -135,7 +125,7 @@ public class ContainerLauncherImpl extends AbstractService implements
     }
 
     @SuppressWarnings("unchecked")
-    public synchronized void launch(NMCommunicatorLaunchRequestEvent event) {
+    public synchronized void launch(ContainerLaunchRequest event) {
       LOG.info("Launching Container with Id: " + event.getContainerId());
       if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) {
         state = ContainerState.DONE;
@@ -171,13 +161,7 @@ public class ContainerLauncherImpl extends AbstractService implements
 
         // after launching, send launched event to task attempt to move
         // it from ASSIGNED to RUNNING state
-        context.getEventHandler().handle(
-            new AMContainerEventLaunched(containerID));
-        ContainerLaunchedEvent lEvt = new ContainerLaunchedEvent(
-            containerID, clock.getTime(), context.getApplicationAttemptId());
-        context.getHistoryHandler().handle(new DAGHistoryEvent(
-            null, lEvt));
-
+        getContext().containerLaunched(containerID);
         this.state = ContainerState.RUNNING;
       } catch (Throwable t) {
         String message = "Container launch failed for " + containerID + " : "
@@ -217,16 +201,14 @@ public class ContainerLauncherImpl extends AbstractService implements
 
             // If stopContainer returns without an error, assuming the stop made
             // it over to the NodeManager.
-          context.getEventHandler().handle(
-              new AMContainerEvent(containerID, AMContainerEventType.C_NM_STOP_SENT));
+          getContext().containerStopRequested(containerID);
         } catch (Throwable t) {
 
           // ignore the cleanup failure
           String message = "cleanup failed for container "
             + this.containerID + " : "
             + ExceptionUtils.getStackTrace(t);
-          context.getEventHandler().handle(
-              new AMContainerEventStopFailed(containerID, message));
+          getContext().containerStopFailed(containerID, message);
           LOG.warn(message);
           this.state = ContainerState.DONE;
           return;
@@ -240,15 +222,9 @@ public class ContainerLauncherImpl extends AbstractService implements
     }
   }
 
-  public ContainerLauncherImpl(AppContext context) {
-    super(ContainerLauncherImpl.class.getName());
-    this.context = context;
-    this.clock = context.getClock();
-  }
-
-  @Override
-  public synchronized void serviceInit(Configuration config) {
-    Configuration conf = new Configuration(config);
+  public ContainerLauncherImpl(ContainerLauncherContext containerLauncherContext) {
+    super(ContainerLauncherImpl.class.getName(), containerLauncherContext);
+    this.conf = new Configuration(containerLauncherContext.getInitialConfiguration());
     conf.setInt(
         CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
         0);
@@ -262,7 +238,7 @@ public class ContainerLauncherImpl extends AbstractService implements
   public void serviceStart() {
     // pass a copy of config to ContainerManagementProtocolProxy until YARN-3497 is fixed
     cmProxy =
-        new ContainerManagementProtocolProxy(new Configuration(getConfig()));
+        new ContainerManagementProtocolProxy(conf);
 
     ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
         "ContainerLauncher #%d").setDaemon(true).build();
@@ -275,7 +251,7 @@ public class ContainerLauncherImpl extends AbstractService implements
     eventHandlingThread = new Thread() {
       @Override
       public void run() {
-        NMCommunicatorEvent event = null;
+        ContainerOp event = null;
         while (!Thread.currentThread().isInterrupted()) {
           try {
             event = eventQueue.take();
@@ -293,9 +269,8 @@ public class ContainerLauncherImpl extends AbstractService implements
 
             // nodes where containers will run at *this* point of time. This is
             // *not* the cluster size and doesn't need to be.
-            int yarnSourceIndex =
-                context.getTaskScheduerIdentifier(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT);
-            int numNodes = context.getNodeTracker().getNumNodes(yarnSourceIndex);
+            int numNodes = getContext().getNumNodes(
+                TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT);
             int idealPoolSize = Math.min(limitOnPoolSize, numNodes);
 
             if (poolSize < idealPoolSize) {
@@ -347,7 +322,7 @@ public class ContainerLauncherImpl extends AbstractService implements
     }
   }
 
-  protected EventProcessor createEventProcessor(NMCommunicatorEvent event) {
+  protected EventProcessor createEventProcessor(ContainerOp event) {
     return new EventProcessor(event);
   }
 
@@ -361,32 +336,29 @@ public class ContainerLauncherImpl extends AbstractService implements
    * Setup and start the container on remote nodemanager.
    */
   class EventProcessor implements Runnable {
-    private NMCommunicatorEvent event;
+    private ContainerOp event;
 
-    EventProcessor(NMCommunicatorEvent event) {
+    EventProcessor(ContainerOp event) {
       this.event = event;
     }
 
     @Override
     public void run() {
-      LOG.info("Processing the event " + event.toString());
+      LOG.info("Processing operation {}", event.toString());
 
       // Load ContainerManager tokens before creating a connection.
       // TODO: Do it only once per NodeManager.
-      ContainerId containerID = event.getContainerId();
+      ContainerId containerID = event.getBaseOperation().getContainerId();
 
       Container c = getContainer(event);
-      switch(event.getType()) {
-
-      case CONTAINER_LAUNCH_REQUEST:
-        NMCommunicatorLaunchRequestEvent launchEvent
-            = (NMCommunicatorLaunchRequestEvent) event;
-        c.launch(launchEvent);
-        break;
-
-      case CONTAINER_STOP_REQUEST:
-        c.kill();
-        break;
+      switch(event.getOpType()) {
+        case LAUNCH_REQUEST:
+          ContainerLaunchRequest launchRequest = event.getLaunchRequest();
+          c.launch(launchRequest);
+          break;
+        case STOP_REQUEST:
+          c.kill();
+          break;
       }
       removeContainerIfDone(containerID);
     }
@@ -408,13 +380,23 @@ public class ContainerLauncherImpl extends AbstractService implements
   void sendContainerLaunchFailedMsg(ContainerId containerId,
       String message) {
     LOG.error(message);
-    context.getEventHandler().handle(new AMContainerEventLaunchFailed(containerId, message));
+    getContext().containerLaunchFailed(containerId, message);
+  }
+
+
+  @Override
+  public void launchContainer(ContainerLaunchRequest launchRequest) {
+    try {
+      eventQueue.put(new ContainerOp(ContainerOp.OPType.LAUNCH_REQUEST, launchRequest));
+    } catch (InterruptedException e) {
+      throw new TezUncheckedException(e);
+    }
   }
 
   @Override
-  public void handle(NMCommunicatorEvent event) {
+  public void stopContainer(ContainerStopRequest stopRequest) {
     try {
-      eventQueue.put(event);
+      eventQueue.put(new ContainerOp(ContainerOp.OPType.STOP_REQUEST, stopRequest));
     } catch (InterruptedException e) {
       throw new TezUncheckedException(e);
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/eec648d3/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
index db145f4..9f741cf 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
@@ -23,12 +23,18 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
+import org.apache.tez.serviceplugins.api.ContainerLauncher;
+import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
+import org.apache.tez.serviceplugins.api.ContainerStopRequest;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.ContainerLauncherContextImpl;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
+import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,11 +44,15 @@ public class ContainerLauncherRouter extends AbstractService
   static final Logger LOG = LoggerFactory.getLogger(ContainerLauncherImpl.class);
 
   private final ContainerLauncher containerLaunchers[];
+  private final ContainerLauncherContext containerLauncherContexts[];
+  private final AppContext appContext;
 
   @VisibleForTesting
-  public ContainerLauncherRouter(ContainerLauncher containerLauncher) {
+  public ContainerLauncherRouter(ContainerLauncher containerLauncher, AppContext context) {
     super(ContainerLauncherRouter.class.getName());
+    this.appContext = context;
     containerLaunchers = new ContainerLauncher[] {containerLauncher};
+    containerLauncherContexts = new ContainerLauncherContext[] {containerLauncher.getContext()};
   }
 
   // Accepting conf to setup final parameters, if required.
@@ -53,6 +63,7 @@ public class ContainerLauncherRouter extends AbstractService
                                  boolean isPureLocalMode) throws UnknownHostException {
     super(ContainerLauncherRouter.class.getName());
 
+    this.appContext = context;
     if (containerLauncherClassIdentifiers == null || containerLauncherClassIdentifiers.length == 0) {
       if (isPureLocalMode) {
         containerLauncherClassIdentifiers =
@@ -62,16 +73,21 @@ public class ContainerLauncherRouter extends AbstractService
             new String[]{TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT};
       }
     }
+    containerLauncherContexts = new ContainerLauncherContext[containerLauncherClassIdentifiers.length];
     containerLaunchers = new ContainerLauncher[containerLauncherClassIdentifiers.length];
 
+
     for (int i = 0; i < containerLauncherClassIdentifiers.length; i++) {
+      ContainerLauncherContext containerLauncherContext = new ContainerLauncherContextImpl(context, taskAttemptListener);
+      containerLauncherContexts[i] = containerLauncherContext;
       containerLaunchers[i] = createContainerLauncher(containerLauncherClassIdentifiers[i], context,
-          taskAttemptListener, workingDirectory, isPureLocalMode, conf);
+          containerLauncherContext, taskAttemptListener, workingDirectory, isPureLocalMode, conf);
     }
   }
 
   private ContainerLauncher createContainerLauncher(String containerLauncherClassIdentifier,
                                                     AppContext context,
+                                                    ContainerLauncherContext containerLauncherContext,
                                                     TaskAttemptListener taskAttemptListener,
                                                     String workingDirectory,
                                                     boolean isPureLocalMode,
@@ -79,12 +95,15 @@ public class ContainerLauncherRouter extends AbstractService
       UnknownHostException {
     if (containerLauncherClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) {
       LOG.info("Creating DefaultContainerLauncher");
-      return new ContainerLauncherImpl(context);
+      return new ContainerLauncherImpl(containerLauncherContext);
     } else if (containerLauncherClassIdentifier
         .equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) {
       LOG.info("Creating LocalContainerLauncher");
+      // TODO Post TEZ-2003. LocalContainerLauncher is special cased, since it makes use of
+      // extensive internals which are only available at runtime. Will likely require
+      // some kind of runtime binding of parameters in the payload to work correctly.
       return
-          new LocalContainerLauncher(context, taskAttemptListener, workingDirectory, isPureLocalMode);
+          new LocalContainerLauncher(containerLauncherContext, context, taskAttemptListener, workingDirectory, isPureLocalMode);
     } else {
       LOG.info("Creating container launcher : " + containerLauncherClassIdentifier);
       Class<? extends ContainerLauncher> containerLauncherClazz =
@@ -92,9 +111,9 @@ public class ContainerLauncherRouter extends AbstractService
               containerLauncherClassIdentifier);
       try {
         Constructor<? extends ContainerLauncher> ctor = containerLauncherClazz
-            .getConstructor(AppContext.class, Configuration.class, TaskAttemptListener.class);
+            .getConstructor(ContainerLauncherContext.class);
         ctor.setAccessible(true);
-        return ctor.newInstance(context, conf, taskAttemptListener);
+        return ctor.newInstance(containerLauncherContext);
       } catch (NoSuchMethodException e) {
         throw new TezUncheckedException(e);
       } catch (InvocationTargetException e) {
@@ -141,6 +160,25 @@ public class ContainerLauncherRouter extends AbstractService
 
   @Override
   public void handle(NMCommunicatorEvent event) {
-    containerLaunchers[event.getLauncherId()].handle(event);
+    int launcherId = event.getLauncherId();
+    String schedulerName = appContext.getTaskSchedulerName(event.getSchedulerId());
+    String taskCommName = appContext.getTaskCommunicatorName(event.getTaskCommId());
+    switch (event.getType()) {
+      case CONTAINER_LAUNCH_REQUEST:
+        NMCommunicatorLaunchRequestEvent launchEvent = (NMCommunicatorLaunchRequestEvent) event;
+        ContainerLaunchRequest launchRequest =
+            new ContainerLaunchRequest(launchEvent.getNodeId(), launchEvent.getContainerId(),
+                launchEvent.getContainerToken(), launchEvent.getContainerLaunchContext(),
+                launchEvent.getContainer(), schedulerName,
+                taskCommName);
+        containerLaunchers[launcherId].launchContainer(launchRequest);
+        break;
+      case CONTAINER_STOP_REQUEST:
+        ContainerStopRequest stopRequest =
+            new ContainerStopRequest(event.getNodeId(), event.getContainerId(),
+                event.getContainerToken(), schedulerName, taskCommName);
+        containerLaunchers[launcherId].stopContainer(stopRequest);
+        break;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/eec648d3/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerOp.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerOp.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerOp.java
new file mode 100644
index 0000000..c62de66
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerOp.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.launcher;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
+import org.apache.tez.serviceplugins.api.ContainerLauncherOperationBase;
+import org.apache.tez.serviceplugins.api.ContainerStopRequest;
+
+@InterfaceAudience.Private
+public class ContainerOp {
+  enum OPType {
+    LAUNCH_REQUEST, STOP_REQUEST
+  }
+
+  final ContainerLauncherOperationBase command;
+  final OPType opType;
+
+  public ContainerOp(OPType opType, ContainerLauncherOperationBase command) {
+    this.opType = opType;
+    this.command = command;
+  }
+
+  public OPType getOpType() {
+    return opType;
+  }
+
+  public ContainerLauncherOperationBase getBaseOperation() {
+    return command;
+  }
+
+  public ContainerLaunchRequest getLaunchRequest() {
+    Preconditions.checkState(opType == OPType.LAUNCH_REQUEST);
+    return (ContainerLaunchRequest) command;
+  }
+
+  public ContainerStopRequest getStopRequest() {
+    Preconditions.checkState(opType == OPType.STOP_REQUEST);
+    return (ContainerStopRequest) command;
+  }
+
+  @Override
+  public String toString() {
+    return "ContainerOp{" +
+        "opType=" + opType +
+        ", command=" + command +
+        '}';
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/eec648d3/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
index fe23409..a1b8e29 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
@@ -44,11 +44,15 @@ import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
+import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
+import org.apache.tez.serviceplugins.api.ContainerLauncher;
+import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
+import org.apache.tez.serviceplugins.api.ContainerStopRequest;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
@@ -60,17 +64,6 @@ import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TezTaskCommunicatorImpl;
-import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
-import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
-import org.apache.tez.dag.app.rm.NMCommunicatorStopRequestEvent;
-import org.apache.tez.dag.app.rm.container.AMContainerEvent;
-import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted;
-import org.apache.tez.dag.app.rm.container.AMContainerEventLaunchFailed;
-import org.apache.tez.dag.app.rm.container.AMContainerEventLaunched;
-import org.apache.tez.dag.app.rm.container.AMContainerEventType;
-import org.apache.tez.dag.history.DAGHistoryEvent;
-import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
-import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.runtime.api.ExecutionContext;
 import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
@@ -82,17 +75,17 @@ import org.apache.tez.runtime.task.TezChild;
  * Since all (sub)tasks share the same local directory, they must be executed
  * sequentially in order to avoid creating/deleting the same files/dirs.
  */
-public class LocalContainerLauncher extends AbstractService implements
-  ContainerLauncher {
+public class LocalContainerLauncher extends ContainerLauncher {
 
   private static final Logger LOG = LoggerFactory.getLogger(LocalContainerLauncher.class);
+
   private final AppContext context;
   private final AtomicBoolean serviceStopped = new AtomicBoolean(false);
   private final String workingDirectory;
   private final TaskAttemptListener tal;
   private final Map<String, String> localEnv;
   private final ExecutionContext executionContext;
-  private int numExecutors;
+  private final int numExecutors;
   private final boolean isPureLocalMode;
 
   private final ConcurrentHashMap<ContainerId, RunningTaskCallback>
@@ -102,23 +95,25 @@ public class LocalContainerLauncher extends AbstractService implements
   private final ExecutorService callbackExecutor = Executors.newFixedThreadPool(1,
       new ThreadFactoryBuilder().setDaemon(true).setNameFormat("CallbackExecutor").build());
 
-  private BlockingQueue<NMCommunicatorEvent> eventQueue =
-      new LinkedBlockingQueue<NMCommunicatorEvent>();
+  private BlockingQueue<ContainerOp> eventQueue = new LinkedBlockingQueue<>();
   private Thread eventHandlingThread;
 
 
   private ListeningExecutorService taskExecutorService;
 
 
-
-  public LocalContainerLauncher(AppContext context,
+  public LocalContainerLauncher(ContainerLauncherContext containerLauncherContext,
+                                AppContext context,
                                 TaskAttemptListener taskAttemptListener,
                                 String workingDirectory,
                                 boolean isPureLocalMode) throws UnknownHostException {
-    super(LocalContainerLauncher.class.getName());
+    // TODO Post TEZ-2003. Most of this information is dynamic and only available after the AM
+    // starts up. It's not possible to set these up via a static payload.
+    // Will need some kind of mechanism to dynamically crate payloads / bind to parameters
+    // after the AM starts up.
+    super(LocalContainerLauncher.class.getName(), containerLauncherContext);
     this.context = context;
     this.tal = taskAttemptListener;
-
     this.workingDirectory = workingDirectory;
     this.isPureLocalMode = isPureLocalMode;
     if (isPureLocalMode) {
@@ -133,11 +128,8 @@ public class LocalContainerLauncher extends AbstractService implements
     String host = isPureLocalMode ? InetAddress.getLocalHost().getHostName() :
         System.getenv(Environment.NM_HOST.name());
     executionContext = new ExecutionContextImpl(host);
-  }
 
-  @Override
-  public synchronized void serviceInit(Configuration conf) {
-    numExecutors = conf.getInt(TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS,
+    numExecutors = getContext().getInitialConfiguration().getInt(TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS,
         TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS_DEFAULT);
     Preconditions.checkState(numExecutors >=1, "Must have at least 1 executor");
     ExecutorService rawExecutor = Executors.newFixedThreadPool(numExecutors,
@@ -169,20 +161,22 @@ public class LocalContainerLauncher extends AbstractService implements
     callbackExecutor.shutdownNow();
   }
 
+
+
   // Thread to monitor the queue of incoming NMCommunicator events
   private class TezSubTaskRunner implements Runnable {
     @Override
     public void run() {
       while (!Thread.currentThread().isInterrupted() && !serviceStopped.get()) {
-        NMCommunicatorEvent event;
+        ContainerOp event;
         try {
           event = eventQueue.take();
-          switch (event.getType()) {
-            case CONTAINER_LAUNCH_REQUEST:
-              launch((NMCommunicatorLaunchRequestEvent) event);
+          switch (event.getOpType()) {
+            case LAUNCH_REQUEST:
+              launch(event.getLaunchRequest());
               break;
-            case CONTAINER_STOP_REQUEST:
-              stop((NMCommunicatorStopRequestEvent)event);
+            case STOP_REQUEST:
+              stop(event.getStopRequest());
               break;
           }
         } catch (InterruptedException e) {
@@ -200,7 +194,7 @@ public class LocalContainerLauncher extends AbstractService implements
 
   @SuppressWarnings("unchecked")
   void sendContainerLaunchFailedMsg(ContainerId containerId, String message) {
-    context.getEventHandler().handle(new AMContainerEventLaunchFailed(containerId, message));
+    getContext().containerLaunchFailed(containerId, message);
   }
 
   private void handleLaunchFailed(Throwable t, ContainerId containerId) {
@@ -215,16 +209,17 @@ public class LocalContainerLauncher extends AbstractService implements
   }
 
   //launch tasks
-  private void launch(NMCommunicatorLaunchRequestEvent event) {
+  private void launch(ContainerLaunchRequest event) {
 
     String tokenIdentifier = context.getApplicationID().toString();
     try {
       TezChild tezChild;
       try {
+        int taskCommId = context.getTaskCommunicatorIdentifier(event.getTaskCommunicatorName());
         tezChild =
             createTezChild(context.getAMConf(), event.getContainerId(), tokenIdentifier,
                 context.getApplicationAttemptId().getAttemptId(), context.getLocalDirs(),
-                ((TezTaskCommunicatorImpl)tal.getTaskCommunicator(event.getTaskCommId())).getUmbilical(),
+                ((TezTaskCommunicatorImpl)tal.getTaskCommunicator(taskCommId)).getUmbilical(),
                 TezCommonUtils.parseCredentialsBytes(event.getContainerLaunchContext().getTokens().array()));
       } catch (InterruptedException e) {
         handleLaunchFailed(e, event.getContainerId());
@@ -238,7 +233,7 @@ public class LocalContainerLauncher extends AbstractService implements
       }
       ListenableFuture<TezChild.ContainerExecutionResult> runningTaskFuture =
           taskExecutorService.submit(createSubTask(tezChild, event.getContainerId()));
-      RunningTaskCallback callback = new RunningTaskCallback(context, event.getContainerId());
+      RunningTaskCallback callback = new RunningTaskCallback(event.getContainerId());
       runningContainers.put(event.getContainerId(), callback);
       Futures.addCallback(runningTaskFuture, callback, callbackExecutor);
     } catch (RejectedExecutionException e) {
@@ -246,7 +241,7 @@ public class LocalContainerLauncher extends AbstractService implements
     }
   }
 
-  private void stop(NMCommunicatorStopRequestEvent event) {
+  private void stop(ContainerStopRequest event) {
     // A stop_request will come in when a task completes and reports back or a preemption decision
     // is made. Currently the LocalTaskScheduler does not support preemption. Also preemption
     // will not work in local mode till Tez supports task preemption instead of container preemption.
@@ -263,18 +258,15 @@ public class LocalContainerLauncher extends AbstractService implements
       // This will need to be fixed once interrupting tasks is supported.
     }
     // Send this event to maintain regular control flow. This isn't of much use though.
-    context.getEventHandler().handle(
-        new AMContainerEvent(event.getContainerId(), AMContainerEventType.C_NM_STOP_SENT));
+    getContext().containerStopRequested(event.getContainerId());
   }
 
   private class RunningTaskCallback
       implements FutureCallback<TezChild.ContainerExecutionResult> {
 
-    private final AppContext appContext;
     private final ContainerId containerId;
 
-    RunningTaskCallback(AppContext appContext, ContainerId containerId) {
-      this.appContext = appContext;
+    RunningTaskCallback(ContainerId containerId) {
       this.containerId = containerId;
     }
 
@@ -286,16 +278,16 @@ public class LocalContainerLauncher extends AbstractService implements
           result.getExitStatus() ==
               TezChild.ContainerExecutionResult.ExitStatus.ASKED_TO_DIE) {
         LOG.info("Container: " + containerId + " completed successfully");
-        appContext.getEventHandler().handle(
-            new AMContainerEventCompleted(containerId, result.getExitStatus().getExitCode(),
-                null, TaskAttemptTerminationCause.CONTAINER_EXITED));
+        getContext()
+            .containerCompleted(containerId, result.getExitStatus().getExitCode(), null,
+                TaskAttemptEndReason.CONTAINER_EXITED);
       } else {
         LOG.info("Container: " + containerId + " completed but with errors");
-        appContext.getEventHandler().handle(
-            new AMContainerEventCompleted(containerId, result.getExitStatus().getExitCode(),
-                result.getErrorMessage() == null ?
-                    (result.getThrowable() == null ? null : result.getThrowable().getMessage()) :
-                    result.getErrorMessage(), TaskAttemptTerminationCause.APPLICATION_ERROR));
+        getContext().containerCompleted(
+            containerId, result.getExitStatus().getExitCode(),
+            result.getErrorMessage() == null ?
+                (result.getThrowable() == null ? null : result.getThrowable().getMessage()) :
+                result.getErrorMessage(), TaskAttemptEndReason.APPLICATION_ERROR);
       }
     }
 
@@ -307,16 +299,14 @@ public class LocalContainerLauncher extends AbstractService implements
       if (!(t instanceof CancellationException)) {
         LOG.info("Container: " + containerId + ": Execution Failed: ", t);
         // Inform of failure with exit code 1.
-        appContext.getEventHandler()
-            .handle(new AMContainerEventCompleted(containerId,
-                TezChild.ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE.getExitCode(),
-                t.getMessage(), TaskAttemptTerminationCause.APPLICATION_ERROR));
+        getContext().containerCompleted(containerId,
+            TezChild.ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE.getExitCode(),
+            t.getMessage(), TaskAttemptEndReason.APPLICATION_ERROR);
       } else {
         LOG.info("Ignoring CancellationException - triggered by LocalContainerLauncher");
-        appContext.getEventHandler()
-            .handle(new AMContainerEventCompleted(containerId,
-                TezChild.ContainerExecutionResult.ExitStatus.SUCCESS.getExitCode(),
-                "CancellationException", TaskAttemptTerminationCause.CONTAINER_EXITED));
+        getContext().containerCompleted(containerId,
+            TezChild.ContainerExecutionResult.ExitStatus.SUCCESS.getExitCode(),
+            "CancellationException", TaskAttemptEndReason.COMMUNICATION_ERROR.CONTAINER_EXITED);
       }
     }
   }
@@ -334,12 +324,7 @@ public class LocalContainerLauncher extends AbstractService implements
         // TezTaskRunner needs to be fixed to ensure this.
         Thread.interrupted();
         // Inform about the launch request now that the container has been allocated a thread to execute in.
-        context.getEventHandler().handle(new AMContainerEventLaunched(containerId));
-        ContainerLaunchedEvent lEvt =
-            new ContainerLaunchedEvent(containerId, context.getClock().getTime(),
-                context.getApplicationAttemptId());
-
-        context.getHistoryHandler().handle(new DAGHistoryEvent(context.getCurrentDAGID(), lEvt));
+        getContext().containerLaunched(containerId);
         return tezChild.run();
       }
     };
@@ -368,11 +353,19 @@ public class LocalContainerLauncher extends AbstractService implements
   }
 
 
+  @Override
+  public void launchContainer(ContainerLaunchRequest launchRequest) {
+    try {
+      eventQueue.put(new ContainerOp(ContainerOp.OPType.LAUNCH_REQUEST, launchRequest));
+    } catch (InterruptedException e) {
+      throw new TezUncheckedException(e);
+    }
+  }
 
   @Override
-  public void handle(NMCommunicatorEvent event) {
+  public void stopContainer(ContainerStopRequest stopRequest) {
     try {
-      eventQueue.put(event);
+      eventQueue.put(new ContainerOp(ContainerOp.OPType.STOP_REQUEST, stopRequest));
     } catch (InterruptedException e) {
       throw new TezUncheckedException(e);
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/eec648d3/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java
index a775948..33763e7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java
@@ -18,7 +18,7 @@
 package org.apache.tez.dag.app.rm;
 
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.tez.dag.api.TaskAttemptEndReason;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.app.dag.TaskAttempt;
 import org.apache.tez.dag.records.TezTaskAttemptID;

http://git-wip-us.apache.org/repos/asf/tez/blob/eec648d3/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
index a601506..365517e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
@@ -40,7 +40,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.tez.dag.api.TaskAttemptEndReason;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.app.AppContext;

http://git-wip-us.apache.org/repos/asf/tez/blob/eec648d3/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java
index f86894f..dc50c37 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java
@@ -29,14 +29,19 @@ public class NMCommunicatorEvent extends AbstractEvent<NMCommunicatorEventType>
   private final NodeId nodeId;
   private final Token containerToken;
   private final int launcherId;
+  private final int schedulerId;
+  private final int taskCommId;
 
   public NMCommunicatorEvent(ContainerId containerId, NodeId nodeId,
-      Token containerToken, NMCommunicatorEventType type, int launcherId) {
+                             Token containerToken, NMCommunicatorEventType type, int launcherId,
+                             int schedulerId, int taskCommId) {
     super(type);
     this.containerId = containerId;
     this.nodeId = nodeId;
     this.containerToken = containerToken;
     this.launcherId = launcherId;
+    this.schedulerId = schedulerId;
+    this.taskCommId = taskCommId;
   }
 
   public ContainerId getContainerId() {
@@ -55,9 +60,18 @@ public class NMCommunicatorEvent extends AbstractEvent<NMCommunicatorEventType>
     return launcherId;
   }
 
+  public int getSchedulerId() {
+    return schedulerId;
+  }
+
+  public int getTaskCommId() {
+    return taskCommId;
+  }
+
   public String toSrting() {
     return super.toString() + " for container " + containerId + ", nodeId: "
-        + nodeId + ", launcherId: " + launcherId;
+        + nodeId + ", launcherId: " + launcherId + ", schedulerId=" + schedulerId +
+        ", taskCommId=" + taskCommId;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/eec648d3/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java
index a38345c..c57b6be 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java
@@ -26,15 +26,14 @@ public class NMCommunicatorLaunchRequestEvent extends NMCommunicatorEvent {
   private final ContainerLaunchContext clc;
   private final Container container;
   // The task communicator index for the specific container being launched.
-  private final int taskCommId;
 
   public NMCommunicatorLaunchRequestEvent(ContainerLaunchContext clc,
-      Container container, int launcherId, int taskCommId) {
+      Container container, int launcherId, int schedulerId, int taskCommId) {
     super(container.getId(), container.getNodeId(), container
-        .getContainerToken(), NMCommunicatorEventType.CONTAINER_LAUNCH_REQUEST, launcherId);
+        .getContainerToken(), NMCommunicatorEventType.CONTAINER_LAUNCH_REQUEST,
+        launcherId, schedulerId, taskCommId);
     this.clc = clc;
     this.container = container;
-    this.taskCommId = taskCommId;
   }
 
   public ContainerLaunchContext getContainerLaunchContext() {
@@ -45,10 +44,6 @@ public class NMCommunicatorLaunchRequestEvent extends NMCommunicatorEvent {
     return container;
   }
 
-  public int getTaskCommId() {
-    return taskCommId;
-  }
-
   @Override
   public boolean equals(Object o) {
     if (this == o) {

http://git-wip-us.apache.org/repos/asf/tez/blob/eec648d3/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java
index c9b5c44..352f450 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java
@@ -25,9 +25,9 @@ import org.apache.hadoop.yarn.api.records.Token;
 public class NMCommunicatorStopRequestEvent extends NMCommunicatorEvent {
 
   public NMCommunicatorStopRequestEvent(ContainerId containerId, NodeId nodeId,
-      Token containerToken, int launcherId) {
+      Token containerToken, int launcherId, int schedulerId, int taskCommId) {
     super(containerId, nodeId, containerToken,
-        NMCommunicatorEventType.CONTAINER_STOP_REQUEST, launcherId);
+        NMCommunicatorEventType.CONTAINER_STOP_REQUEST, launcherId, schedulerId, taskCommId);
   }
 
 }