You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/26 01:48:54 UTC

[7/7] tez git commit: TEZ-2708. Rename classes and variables post TEZ-2003 changes. (sseth)

TEZ-2708. Rename classes and variables post TEZ-2003 changes. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/8b278ea8
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/8b278ea8
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/8b278ea8

Branch: refs/heads/master
Commit: 8b278ea84f4c64e7144c07fa79b38a6a719541d2
Parents: dc0ee01
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue Aug 25 16:48:00 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Tue Aug 25 16:48:00 2015 -0700

----------------------------------------------------------------------
 tez-dag/findbugs-exclude.xml                    |  15 +-
 .../java/org/apache/tez/dag/app/AppContext.java |   4 +-
 .../dag/app/ContainerLauncherContextImpl.java   |   4 +-
 .../org/apache/tez/dag/app/DAGAppMaster.java    | 113 +--
 .../apache/tez/dag/app/TaskAttemptListener.java |  46 --
 .../dag/app/TaskAttemptListenerImpTezDag.java   | 449 -----------
 .../dag/app/TaskCommunicatorContextImpl.java    |  22 +-
 .../tez/dag/app/TaskCommunicatorManager.java    | 449 +++++++++++
 .../app/TaskCommunicatorManagerInterface.java   |  46 ++
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |  10 +-
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |   8 +-
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   |  10 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  10 +-
 .../dag/app/launcher/ContainerLauncherImpl.java | 410 ----------
 .../app/launcher/ContainerLauncherManager.java  | 217 +++++
 .../app/launcher/ContainerLauncherRouter.java   | 215 -----
 .../app/launcher/LocalContainerLauncher.java    |   8 +-
 .../app/launcher/TezContainerLauncherImpl.java  | 410 ++++++++++
 .../tez/dag/app/rm/ContainerLauncherEvent.java  | 116 +++
 .../dag/app/rm/ContainerLauncherEventType.java  |  25 +
 .../rm/ContainerLauncherLaunchRequestEvent.java |  79 ++
 .../rm/ContainerLauncherStopRequestEvent.java   |  34 +
 .../tez/dag/app/rm/NMCommunicatorEvent.java     | 115 ---
 .../tez/dag/app/rm/NMCommunicatorEventType.java |  25 -
 .../rm/NMCommunicatorLaunchRequestEvent.java    |  78 --
 .../app/rm/NMCommunicatorStopRequestEvent.java  |  33 -
 .../dag/app/rm/TaskSchedulerContextImpl.java    |  29 +-
 .../dag/app/rm/TaskSchedulerEventHandler.java   | 784 ------------------
 .../tez/dag/app/rm/TaskSchedulerManager.java    | 786 +++++++++++++++++++
 .../dag/app/rm/container/AMContainerImpl.java   |  26 +-
 .../dag/app/rm/container/AMContainerMap.java    |   6 +-
 .../apache/tez/dag/app/MockDAGAppMaster.java    |  23 +-
 .../tez/dag/app/TestMockDAGAppMaster.java       |   2 +-
 .../app/TestTaskAttemptListenerImplTezDag.java  | 426 ----------
 .../app/TestTaskAttemptListenerImplTezDag2.java | 137 ----
 .../app/TestTaskCommunicatorContextImpl.java    |   2 +-
 .../dag/app/TestTaskCommunicatorManager.java    |   2 +-
 .../dag/app/TestTaskCommunicatorManager1.java   | 425 ++++++++++
 .../dag/app/TestTaskCommunicatorManager2.java   | 136 ++++
 .../apache/tez/dag/app/dag/impl/TestCommit.java |   6 +-
 .../tez/dag/app/dag/impl/TestDAGImpl.java       |  12 +-
 .../tez/dag/app/dag/impl/TestDAGRecovery.java   |   4 +-
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   |  58 +-
 .../app/dag/impl/TestTaskAttemptRecovery.java   |   4 +-
 .../tez/dag/app/dag/impl/TestTaskImpl.java      |  16 +-
 .../tez/dag/app/dag/impl/TestTaskRecovery.java  |   4 +-
 .../tez/dag/app/dag/impl/TestVertexImpl.java    |  31 +-
 .../tez/dag/app/dag/impl/TestVertexImpl2.java   |   4 +-
 .../dag/app/dag/impl/TestVertexRecovery.java    |  10 +-
 .../launcher/TestContainerLauncherManager.java  | 359 +++++++++
 .../launcher/TestContainerLauncherRouter.java   | 359 ---------
 .../tez/dag/app/rm/TestContainerReuse.java      | 310 ++++----
 .../app/rm/TestTaskSchedulerEventHandler.java   | 707 -----------------
 .../dag/app/rm/TestTaskSchedulerHelpers.java    |  14 +-
 .../dag/app/rm/TestTaskSchedulerManager.java    | 708 +++++++++++++++++
 .../dag/app/rm/container/TestAMContainer.java   |  24 +-
 .../app/rm/container/TestAMContainerMap.java    |   6 +-
 .../tez/dag/app/rm/node/TestAMNodeTracker.java  |  38 +-
 .../tez/service/impl/ContainerRunnerImpl.java   |  23 +-
 .../apache/tez/runtime/task/TezTaskRunner.java  | 451 -----------
 .../tez/runtime/task/TestTaskExecution.java     | 362 ---------
 .../tez/runtime/task/TestTaskExecution2.java    |   2 +-
 62 files changed, 4220 insertions(+), 5027 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-dag/findbugs-exclude.xml b/tez-dag/findbugs-exclude.xml
index 9d15035..6db3b7c 100644
--- a/tez-dag/findbugs-exclude.xml
+++ b/tez-dag/findbugs-exclude.xml
@@ -87,13 +87,13 @@
   </Match>
 
   <Match>
-    <Class name="~org\.apache\.tez\.dag\.app\.rm\.TaskSchedulerEventHandler"/>
+    <Class name="~org\.apache\.tez\.dag\.app\.rm\.TaskSchedulerManager"/>
     <Bug pattern="BC_UNCONFIRMED_CAST"/>
   </Match>
 
   <Match>
-    <Class name="org.apache.tez.dag.app.launcher.ContainerLauncherRouter" />
-    <Method name="handle" params="org.apache.tez.dag.app.rm.NMCommunicatorEvent" returns="void" />
+    <Class name="org.apache.tez.dag.app.launcher.ContainerLauncherManager" />
+    <Method name="handle" params="org.apache.tez.dag.app.rm.ContainerLauncherEvent" returns="void" />
     <Bug pattern="BC_UNCONFIRMED_CAST" />
   </Match>
 
@@ -151,7 +151,7 @@
       <Field name="context"/>
       <Field name="currentDAG"/>
       <Field name="state"/>
-      <Field name="taskSchedulerEventHandler"/>
+      <Field name="taskSchedulerManager"/>
       <Field name="versionMismatch"/>
       <Field name="versionMismatchDiagnostics"/>
       <Field name="containers"/>
@@ -165,13 +165,6 @@
     <Bug pattern="IS2_INCONSISTENT_SYNC"/>
   </Match>
 
-  <!-- TEZ-1955 -->
-  <Match>
-    <Class name="org.apache.tez.dag.app.rm.TaskSchedulerEventHandler"/>
-    <Field name="taskScheduler"/>
-    <Bug pattern="IS2_INCONSISTENT_SYNC"/>
-  </Match>
-
   <!-- TEZ-1956 -->
   <Match>
     <Class name="org.apache.tez.dag.app.rm.YarnTaskSchedulerService"/>

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
index 516fcef..657267b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.tez.dag.app.dag.DAG;
-import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler;
+import org.apache.tez.dag.app.rm.TaskSchedulerManager;
 import org.apache.tez.dag.app.rm.container.AMContainerMap;
 import org.apache.tez.dag.app.rm.node.AMNodeTracker;
 import org.apache.tez.common.security.ACLManager;
@@ -88,7 +88,7 @@ public interface AppContext {
 
   AMNodeTracker getNodeTracker();
 
-  TaskSchedulerEventHandler getTaskScheduler();
+  TaskSchedulerManager getTaskScheduler();
 
   boolean isSession();
 

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java
index 3a2efc5..20bfd13 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java
@@ -33,10 +33,10 @@ import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
 public class ContainerLauncherContextImpl implements ContainerLauncherContext {
 
   private final AppContext context;
-  private final TaskAttemptListener tal;
+  private final TaskCommunicatorManagerInterface tal;
   private final UserPayload initialUserPayload;
 
-  public ContainerLauncherContextImpl(AppContext appContext, TaskAttemptListener tal, UserPayload initialUserPayload) {
+  public ContainerLauncherContextImpl(AppContext appContext, TaskCommunicatorManagerInterface tal, UserPayload initialUserPayload) {
     this.context = appContext;
     this.tal = tal;
     this.initialUserPayload = initialUserPayload;

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 84b3095..34e7c2a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -40,7 +40,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -149,10 +148,10 @@ import org.apache.tez.dag.app.dag.event.TaskEventType;
 import org.apache.tez.dag.app.dag.event.VertexEvent;
 import org.apache.tez.dag.app.dag.event.VertexEventType;
 import org.apache.tez.dag.app.dag.impl.DAGImpl;
-import org.apache.tez.dag.app.launcher.ContainerLauncherRouter;
+import org.apache.tez.dag.app.launcher.ContainerLauncherManager;
 import org.apache.tez.dag.app.rm.AMSchedulerEventType;
-import org.apache.tez.dag.app.rm.NMCommunicatorEventType;
-import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler;
+import org.apache.tez.dag.app.rm.ContainerLauncherEventType;
+import org.apache.tez.dag.app.rm.TaskSchedulerManager;
 import org.apache.tez.dag.app.rm.container.AMContainerEventType;
 import org.apache.tez.dag.app.rm.container.AMContainerMap;
 import org.apache.tez.dag.app.rm.container.ContainerContextMatcher;
@@ -236,16 +235,16 @@ public class DAGAppMaster extends AbstractService {
   private AppContext context;
   private Configuration amConf;
   private AsyncDispatcher dispatcher;
-  private ContainerLauncherRouter containerLauncherRouter;
+  private ContainerLauncherManager containerLauncherManager;
   private ContainerHeartbeatHandler containerHeartbeatHandler;
   private TaskHeartbeatHandler taskHeartbeatHandler;
-  private TaskAttemptListener taskAttemptListener;
+  private TaskCommunicatorManagerInterface taskCommunicatorManager;
   private JobTokenSecretManager jobTokenSecretManager =
       new JobTokenSecretManager();
   private Token<JobTokenIdentifier> sessionToken;
   private DagEventDispatcher dagEventDispatcher;
   private VertexEventDispatcher vertexEventDispatcher;
-  private TaskSchedulerEventHandler taskSchedulerEventHandler;
+  private TaskSchedulerManager taskSchedulerManager;
   private WebUIService webUIService;
   private HistoryEventHandler historyEventHandler;
   private final Map<String, LocalResource> amResources = new HashMap<String, LocalResource>();
@@ -472,13 +471,13 @@ public class DAGAppMaster extends AbstractService {
 
 
     //service to handle requests to TaskUmbilicalProtocol
-    taskAttemptListener = createTaskAttemptListener(context,
+    taskCommunicatorManager = createTaskCommunicatorManager(context,
         taskHeartbeatHandler, containerHeartbeatHandler, taskCommunicatorDescriptors);
-    addIfService(taskAttemptListener, true);
+    addIfService(taskCommunicatorManager, true);
 
     containerSignatureMatcher = createContainerSignatureMatcher();
     containers = new AMContainerMap(containerHeartbeatHandler,
-        taskAttemptListener, containerSignatureMatcher, context);
+        taskCommunicatorManager, containerSignatureMatcher, context);
     addIfService(containers, true);
     dispatcher.register(AMContainerEventType.class, containers);
 
@@ -521,29 +520,30 @@ public class DAGAppMaster extends AbstractService {
 
 
 
-    this.taskSchedulerEventHandler = new TaskSchedulerEventHandler(context,
+    this.taskSchedulerManager = new TaskSchedulerManager(context,
         clientRpcServer, dispatcher.getEventHandler(), containerSignatureMatcher, webUIService,
         taskSchedulerDescriptors, isLocal);
-    addIfService(taskSchedulerEventHandler, true);
+    addIfService(taskSchedulerManager, true);
 
     if (enableWebUIService()) {
-      addIfServiceDependency(taskSchedulerEventHandler, webUIService);
+      addIfServiceDependency(taskSchedulerManager, webUIService);
     }
 
     if (isLastAMRetry) {
       LOG.info("AM will unregister as this is the last attempt"
           + ", currentAttempt=" + appAttemptID.getAttemptId()
           + ", maxAttempts=" + maxAppAttempts);
-      this.taskSchedulerEventHandler.setShouldUnregisterFlag();
+      this.taskSchedulerManager.setShouldUnregisterFlag();
     }
 
     dispatcher.register(AMSchedulerEventType.class,
-        taskSchedulerEventHandler);
-    addIfServiceDependency(taskSchedulerEventHandler, clientRpcServer);
+        taskSchedulerManager);
+    addIfServiceDependency(taskSchedulerManager, clientRpcServer);
 
-    this.containerLauncherRouter = createContainerLauncherRouter(containerLauncherDescriptors, isLocal);
-    addIfService(containerLauncherRouter, true);
-    dispatcher.register(NMCommunicatorEventType.class, containerLauncherRouter);
+    this.containerLauncherManager = createContainerLauncherManager(containerLauncherDescriptors,
+        isLocal);
+    addIfService(containerLauncherManager, true);
+    dispatcher.register(ContainerLauncherEventType.class, containerLauncherManager);
 
     historyEventHandler = createHistoryEventHandler(context);
     addIfService(historyEventHandler, true);
@@ -632,8 +632,8 @@ public class DAGAppMaster extends AbstractService {
   }
   
   @VisibleForTesting
-  protected TaskSchedulerEventHandler getTaskSchedulerEventHandler() {
-    return taskSchedulerEventHandler;
+  protected TaskSchedulerManager getTaskSchedulerManager() {
+    return taskSchedulerManager;
   }
 
   @VisibleForTesting
@@ -661,7 +661,7 @@ public class DAGAppMaster extends AbstractService {
         sendEvent(new DAGEvent(currentDAG.getID(), DAGEventType.INTERNAL_ERROR));
       } else {
         LOG.info("Internal Error. Finishing directly as no dag is active.");
-        this.taskSchedulerEventHandler.setShouldUnregisterFlag();
+        this.taskSchedulerManager.setShouldUnregisterFlag();
         shutdownHandler.shutdown();
       }
       break;
@@ -673,7 +673,7 @@ public class DAGAppMaster extends AbstractService {
       System.out.println(timeStamp + " Completed Dag: " + finishEvt.getDAGId().toString());
       if (!isSession) {
         LOG.info("Not a session, AM will unregister as DAG has completed");
-        this.taskSchedulerEventHandler.setShouldUnregisterFlag();
+        this.taskSchedulerManager.setShouldUnregisterFlag();
         _updateLoggers(currentDAG, "_post");
         setStateOnDAGCompletion();
         LOG.info("Shutting down on completion of dag:" +
@@ -722,7 +722,7 @@ public class DAGAppMaster extends AbstractService {
               + finishEvt.getDAGState()
               + ". Error. Shutting down.");
           state = DAGAppMasterState.ERROR;
-          this.taskSchedulerEventHandler.setShouldUnregisterFlag();
+          this.taskSchedulerManager.setShouldUnregisterFlag();
           shutdownHandler.shutdown();
           break;
         }
@@ -741,11 +741,11 @@ public class DAGAppMaster extends AbstractService {
 
             // Leaving the taskSchedulerEventHandler here for now. Doesn't generate new events.
             // However, eventually it needs to be moved out.
-            this.taskSchedulerEventHandler.dagCompleted();
+            this.taskSchedulerManager.dagCompleted();
             state = DAGAppMasterState.IDLE;
           } else {
             LOG.info("Session shutting down now.");
-            this.taskSchedulerEventHandler.setShouldUnregisterFlag();
+            this.taskSchedulerManager.setShouldUnregisterFlag();
             if (this.historyEventHandler.hasRecoveryFailed()) {
               state = DAGAppMasterState.FAILED;
             } else {
@@ -772,8 +772,8 @@ public class DAGAppMaster extends AbstractService {
       DAGAppMasterEventDagCleanup cleanupEvent = (DAGAppMasterEventDagCleanup) event;
       LOG.info("Cleaning up DAG: name=" + cleanupEvent.getDag().getName() + ", with id=" +
           cleanupEvent.getDag().getID());
-      containerLauncherRouter.dagComplete(cleanupEvent.getDag());
-      taskAttemptListener.dagComplete(cleanupEvent.getDag());
+      containerLauncherManager.dagComplete(cleanupEvent.getDag());
+      taskCommunicatorManager.dagComplete(cleanupEvent.getDag());
       nodes.dagComplete(cleanupEvent.getDag());
       containers.dagComplete(cleanupEvent.getDag());
       TezTaskAttemptID.clearCache();
@@ -785,9 +785,9 @@ public class DAGAppMaster extends AbstractService {
       break;
     case NEW_DAG_SUBMITTED:
       // Inform sub-components that a new DAG has been submitted.
-      taskSchedulerEventHandler.dagSubmitted();
-      containerLauncherRouter.dagSubmitted();
-      taskAttemptListener.dagSubmitted();
+      taskSchedulerManager.dagSubmitted();
+      containerLauncherManager.dagSubmitted();
+      taskCommunicatorManager.dagSubmitted();
       break;
     default:
       throw new TezUncheckedException(
@@ -917,7 +917,7 @@ public class DAGAppMaster extends AbstractService {
     // create single dag
     DAGImpl newDag =
         new DAGImpl(dagId, amConf, dagPB, dispatcher.getEventHandler(),
-            taskAttemptListener, dagCredentials, clock,
+            taskCommunicatorManager, dagCredentials, clock,
             appMasterUgi.getShortUserName(),
             taskHeartbeatHandler, context);
 
@@ -1048,13 +1048,13 @@ public class DAGAppMaster extends AbstractService {
     }
   }
 
-  protected TaskAttemptListener createTaskAttemptListener(AppContext context,
-                                                          TaskHeartbeatHandler thh,
-                                                          ContainerHeartbeatHandler chh,
-                                                          List<NamedEntityDescriptor> entityDescriptors) {
-    TaskAttemptListener lis =
-        new TaskAttemptListenerImpTezDag(context, thh, chh, entityDescriptors);
-    return lis;
+  protected TaskCommunicatorManagerInterface createTaskCommunicatorManager(AppContext context,
+                                                                           TaskHeartbeatHandler thh,
+                                                                           ContainerHeartbeatHandler chh,
+                                                                           List<NamedEntityDescriptor> entityDescriptors) {
+    TaskCommunicatorManagerInterface tcm =
+        new TaskCommunicatorManager(context, thh, chh, entityDescriptors);
+    return tcm;
   }
 
   protected TaskHeartbeatHandler createTaskHeartbeatHandler(AppContext context,
@@ -1074,10 +1074,11 @@ public class DAGAppMaster extends AbstractService {
     return chh;
   }
 
-  protected ContainerLauncherRouter createContainerLauncherRouter(List<NamedEntityDescriptor> containerLauncherDescriptors,
-                                                                  boolean isLocal) throws
+  protected ContainerLauncherManager createContainerLauncherManager(
+      List<NamedEntityDescriptor> containerLauncherDescriptors,
+      boolean isLocal) throws
       UnknownHostException {
-    return new ContainerLauncherRouter(context, taskAttemptListener, workingDirectory,
+    return new ContainerLauncherManager(context, taskCommunicatorManager, workingDirectory,
         containerLauncherDescriptors, isLocal);
   }
 
@@ -1101,12 +1102,12 @@ public class DAGAppMaster extends AbstractService {
     return dispatcher;
   }
 
-  public ContainerLauncherRouter getContainerLauncherRouter() {
-    return containerLauncherRouter;
+  public ContainerLauncherManager getContainerLauncherManager() {
+    return containerLauncherManager;
   }
 
-  public TaskAttemptListener getTaskAttemptListener() {
-    return taskAttemptListener;
+  public TaskCommunicatorManagerInterface getTaskCommunicatorManager() {
+    return taskCommunicatorManager;
   }
 
   public ContainerId getAppContainerId() {
@@ -1213,7 +1214,7 @@ public class DAGAppMaster extends AbstractService {
   public void shutdownTezAM() throws TezException {
     sessionStopped.set(true);
     synchronized (this) {
-      this.taskSchedulerEventHandler.setShouldUnregisterFlag();
+      this.taskSchedulerManager.setShouldUnregisterFlag();
       if (currentDAG != null
           && !currentDAG.isComplete()) {
         //send a DAG_KILL message
@@ -1473,8 +1474,8 @@ public class DAGAppMaster extends AbstractService {
     }
 
     @Override
-    public TaskSchedulerEventHandler getTaskScheduler() {
-      return taskSchedulerEventHandler;
+    public TaskSchedulerManager getTaskScheduler() {
+      return taskSchedulerManager;
     }
 
     @Override
@@ -1574,7 +1575,7 @@ public class DAGAppMaster extends AbstractService {
         throw new TezUncheckedException(
             "Cannot get ApplicationACLs before all services have started");
       }
-      return taskSchedulerEventHandler.getApplicationAcls();
+      return taskSchedulerManager.getApplicationAcls();
     }
 
     @Override
@@ -1805,7 +1806,7 @@ public class DAGAppMaster extends AbstractService {
 
     if (versionMismatch) {
       // Short-circuit and return as no DAG should not be run
-      this.taskSchedulerEventHandler.setShouldUnregisterFlag();
+      this.taskSchedulerManager.setShouldUnregisterFlag();
       shutdownHandler.shutdown();
       return;
     }
@@ -1825,7 +1826,7 @@ public class DAGAppMaster extends AbstractService {
       LOG.error("Error occurred when trying to recover data from previous attempt."
           + " Shutting down AM", e);
       this.state = DAGAppMasterState.ERROR;
-      this.taskSchedulerEventHandler.setShouldUnregisterFlag();
+      this.taskSchedulerManager.setShouldUnregisterFlag();
       shutdownHandler.shutdown();
       return;
     }
@@ -1929,7 +1930,7 @@ public class DAGAppMaster extends AbstractService {
 
 
   private void initiateStop() {
-    taskSchedulerEventHandler.initiateStop();
+    taskSchedulerManager.initiateStop();
   }
 
   @Override
@@ -1954,8 +1955,8 @@ public class DAGAppMaster extends AbstractService {
         LOG.debug("Checking whether tez scratch data dir should be deleted, deleteTezScratchData="
             + deleteTezScratchData);
       }
-      if (deleteTezScratchData && this.taskSchedulerEventHandler != null
-          && this.taskSchedulerEventHandler.hasUnregistered()) {
+      if (deleteTezScratchData && this.taskSchedulerManager != null
+          && this.taskSchedulerManager.hasUnregistered()) {
         // Delete tez scratch data dir
         if (this.tezSystemStagingDir != null) {
           try {
@@ -2208,7 +2209,7 @@ public class DAGAppMaster extends AbstractService {
         // Notify TaskScheduler that a SIGTERM has been received so that it
         // unregisters quickly with proper status
         LOG.info("DAGAppMaster received a signal. Signaling TaskScheduler");
-        appMaster.taskSchedulerEventHandler.setSignalled(true);
+        appMaster.taskSchedulerManager.setSignalled(true);
       }
 
       if (EnumSet.of(DAGAppMasterState.NEW, DAGAppMasterState.INITED,

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
deleted file mode 100644
index 761bdb0..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app;
-
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.tez.serviceplugins.api.ContainerEndReason;
-import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
-import org.apache.tez.dag.app.dag.DAG;
-import org.apache.tez.dag.api.TaskCommunicator;
-import org.apache.tez.dag.app.rm.container.AMContainerTask;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-/**
- * This class listens for changes to the state of a Task.
- */
-public interface TaskAttemptListener {
-
-  void registerRunningContainer(ContainerId containerId, int taskCommId);
-
-  void registerTaskAttempt(AMContainerTask amContainerTask, ContainerId containerId, int taskCommId);
-  
-  void unregisterRunningContainer(ContainerId containerId, int taskCommId, ContainerEndReason endReason, String diagnostics);
-  
-  void unregisterTaskAttempt(TezTaskAttemptID attemptID, int taskCommId, TaskAttemptEndReason endReason, String diagnostics);
-
-  void dagComplete(DAG dag);
-
-  void dagSubmitted();
-
-  TaskCommunicator getTaskCommunicator(int taskCommIndex);
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
deleted file mode 100644
index 185193f..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ /dev/null
@@ -1,449 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.tez.dag.app;
-
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import org.apache.commons.collections4.ListUtils;
-import org.apache.tez.dag.api.NamedEntityDescriptor;
-import org.apache.tez.dag.api.TezConstants;
-import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
-import org.apache.tez.serviceplugins.api.ContainerEndReason;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
-import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
-import org.apache.tez.runtime.api.impl.EventType;
-import org.apache.tez.dag.api.event.VertexStateUpdate;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.tez.common.ReflectionUtils;
-import org.apache.tez.common.TezUtilsInternal;
-import org.apache.tez.dag.api.TaskCommunicator;
-import org.apache.tez.dag.api.TaskCommunicatorContext;
-import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
-import org.apache.tez.dag.api.TaskHeartbeatResponse;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.tez.dag.api.TaskHeartbeatRequest;
-import org.apache.tez.dag.app.dag.DAG;
-import org.apache.tez.dag.app.dag.Task;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptKilled;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
-import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
-import org.apache.tez.dag.app.rm.container.AMContainerTask;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.runtime.api.impl.TezEvent;
-
-
-@SuppressWarnings("unchecked")
-@InterfaceAudience.Private
-public class TaskAttemptListenerImpTezDag extends AbstractService implements
-    TaskAttemptListener {
-
-  private static final Logger LOG = LoggerFactory
-      .getLogger(TaskAttemptListenerImpTezDag.class);
-
-  private final AppContext context;
-  private final TaskCommunicator[] taskCommunicators;
-  private final TaskCommunicatorContext[] taskCommunicatorContexts;
-  protected final ServicePluginLifecycleAbstractService []taskCommunicatorServiceWrappers;
-
-  protected final TaskHeartbeatHandler taskHeartbeatHandler;
-  protected final ContainerHeartbeatHandler containerHeartbeatHandler;
-
-  private final TaskHeartbeatResponse RESPONSE_SHOULD_DIE = new TaskHeartbeatResponse(true, null, 0, 0);
-
-  private final ConcurrentMap<TezTaskAttemptID, ContainerId> registeredAttempts =
-      new ConcurrentHashMap<TezTaskAttemptID, ContainerId>();
-  private final ConcurrentMap<ContainerId, ContainerInfo> registeredContainers =
-      new ConcurrentHashMap<ContainerId, ContainerInfo>();
-
-  // Defined primarily to work around ConcurrentMaps not accepting null values
-  private static final class ContainerInfo {
-    TezTaskAttemptID taskAttemptId;
-    ContainerInfo(TezTaskAttemptID taskAttemptId) {
-      this.taskAttemptId = taskAttemptId;
-    }
-  }
-
-  private static final ContainerInfo NULL_CONTAINER_INFO = new ContainerInfo(null);
-
-
-  public TaskAttemptListenerImpTezDag(AppContext context,
-                                      TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh,
-                                      List<NamedEntityDescriptor> taskCommunicatorDescriptors) {
-    super(TaskAttemptListenerImpTezDag.class.getName());
-    this.context = context;
-    this.taskHeartbeatHandler = thh;
-    this.containerHeartbeatHandler = chh;
-    Preconditions.checkArgument(
-        taskCommunicatorDescriptors != null && !taskCommunicatorDescriptors.isEmpty(),
-        "TaskCommunicators must be specified");
-    this.taskCommunicators = new TaskCommunicator[taskCommunicatorDescriptors.size()];
-    this.taskCommunicatorContexts = new TaskCommunicatorContext[taskCommunicatorDescriptors.size()];
-    this.taskCommunicatorServiceWrappers = new ServicePluginLifecycleAbstractService[taskCommunicatorDescriptors.size()];
-    for (int i = 0 ; i < taskCommunicatorDescriptors.size() ; i++) {
-      UserPayload userPayload = taskCommunicatorDescriptors.get(i).getUserPayload();
-      taskCommunicatorContexts[i] = new TaskCommunicatorContextImpl(context, this, userPayload, i);
-      taskCommunicators[i] = createTaskCommunicator(taskCommunicatorDescriptors.get(i), i);
-      taskCommunicatorServiceWrappers[i] = new ServicePluginLifecycleAbstractService(taskCommunicators[i]);
-    }
-    // TODO TEZ-2118 Start using taskCommunicator indices properly
-  }
-
-  @Override
-  public void serviceStart() {
-    // TODO Why is init tied to serviceStart
-    for (int i = 0 ; i < taskCommunicators.length ; i++) {
-      taskCommunicatorServiceWrappers[i].init(getConfig());
-      taskCommunicatorServiceWrappers[i].start();
-    }
-  }
-
-  @Override
-  public void serviceStop() {
-    for (int i = 0 ; i < taskCommunicators.length ; i++) {
-      taskCommunicatorServiceWrappers[i].stop();
-    }
-  }
-
-  @VisibleForTesting
-  TaskCommunicator createTaskCommunicator(NamedEntityDescriptor taskCommDescriptor,
-                                          int taskCommIndex) {
-    if (taskCommDescriptor.getEntityName().equals(TezConstants.getTezYarnServicePluginName())) {
-      return createDefaultTaskCommunicator(taskCommunicatorContexts[taskCommIndex]);
-    } else if (taskCommDescriptor.getEntityName()
-        .equals(TezConstants.getTezUberServicePluginName())) {
-      return createUberTaskCommunicator(taskCommunicatorContexts[taskCommIndex]);
-    } else {
-      return createCustomTaskCommunicator(taskCommunicatorContexts[taskCommIndex],
-          taskCommDescriptor);
-    }
-  }
-
-  @VisibleForTesting
-  TaskCommunicator createDefaultTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext) {
-    LOG.info("Using Default Task Communicator");
-    return new TezTaskCommunicatorImpl(taskCommunicatorContext);
-  }
-
-  @VisibleForTesting
-  TaskCommunicator createUberTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext) {
-    LOG.info("Using Default Local Task Communicator");
-    return new TezLocalTaskCommunicatorImpl(taskCommunicatorContext);
-  }
-
-  @VisibleForTesting
-  TaskCommunicator createCustomTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext,
-                                                NamedEntityDescriptor taskCommDescriptor) {
-    LOG.info("Using TaskCommunicator {}:{} " + taskCommDescriptor.getEntityName(),
-        taskCommDescriptor.getClassName());
-    Class<? extends TaskCommunicator> taskCommClazz =
-        (Class<? extends TaskCommunicator>) ReflectionUtils
-            .getClazz(taskCommDescriptor.getClassName());
-    try {
-      Constructor<? extends TaskCommunicator> ctor =
-          taskCommClazz.getConstructor(TaskCommunicatorContext.class);
-      return ctor.newInstance(taskCommunicatorContext);
-    } catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) {
-      throw new TezUncheckedException(e);
-    }
-  }
-
-  public TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request)
-      throws IOException, TezException {
-    ContainerId containerId = ConverterUtils.toContainerId(request
-        .getContainerIdentifier());
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Received heartbeat from container"
-          + ", request=" + request);
-    }
-
-    if (!registeredContainers.containsKey(containerId)) {
-      LOG.warn("Received task heartbeat from unknown container with id: " + containerId +
-          ", asking it to die");
-      return RESPONSE_SHOULD_DIE;
-    }
-
-    // A heartbeat can come in anytime. The AM may have made a decision to kill a running task/container
-    // meanwhile. If the decision is processed through the pipeline before the heartbeat is processed,
-    // the heartbeat will be dropped. Otherwise the heartbeat will be processed - and the system
-    // know how to handle this - via FailedInputEvents for example (relevant only if the heartbeat has events).
-    // So - avoiding synchronization.
-
-    pingContainerHeartbeatHandler(containerId);
-    TaskAttemptEventInfo eventInfo = new TaskAttemptEventInfo(0, null, 0);
-    TezTaskAttemptID taskAttemptID = request.getTaskAttemptId();
-    if (taskAttemptID != null) {
-      ContainerId containerIdFromMap = registeredAttempts.get(taskAttemptID);
-      if (containerIdFromMap == null || !containerIdFromMap.equals(containerId)) {
-        // This can happen when a task heartbeats. Meanwhile the container is unregistered.
-        // The information will eventually make it through to the plugin via a corresponding unregister.
-        // There's a race in that case between the unregister making it through, and this method returning.
-        // TODO TEZ-2003 (post) TEZ-2666. An exception back is likely a better approach than sending a shouldDie = true,
-        // so that the plugin can handle the scenario. Alternately augment the response with error codes.
-        // Error codes would be better than exceptions.
-        LOG.info("Attempt: " + taskAttemptID + " is not recognized for heartbeats");
-        return RESPONSE_SHOULD_DIE;
-      }
-
-      List<TezEvent> inEvents = request.getEvents();
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Ping from " + taskAttemptID.toString() +
-            " events: " + (inEvents != null ? inEvents.size() : -1));
-      }
-
-      long currTime = context.getClock().getTime();
-      List<TezEvent> otherEvents = new ArrayList<TezEvent>();
-      // route TASK_STATUS_UPDATE_EVENT directly to TaskAttempt and route other events
-      // (DATA_MOVEMENT_EVENT, TASK_ATTEMPT_COMPLETED_EVENT, TASK_ATTEMPT_FAILED_EVENT)
-      // to VertexImpl to ensure the events ordering
-      //  1. DataMovementEvent is logged as RecoveryEvent before TaskAttemptFinishedEvent
-      //  2. TaskStatusEvent is handled before TaskAttemptFinishedEvent
-      for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) {
-        // for now, set the event time on the AM when it is received.
-        // this avoids any time disparity between machines.
-        tezEvent.setEventReceivedTime(currTime);
-        final EventType eventType = tezEvent.getEventType();
-        if (eventType == EventType.TASK_STATUS_UPDATE_EVENT) {
-          TaskAttemptEvent taskAttemptEvent = new TaskAttemptEventStatusUpdate(taskAttemptID,
-              (TaskStatusUpdateEvent) tezEvent.getEvent());
-          context.getEventHandler().handle(taskAttemptEvent);
-        } else {
-          otherEvents.add(tezEvent);
-        }
-      }
-      if(!otherEvents.isEmpty()) {
-        TezVertexID vertexId = taskAttemptID.getTaskID().getVertexID();
-        context.getEventHandler().handle(
-            new VertexEventRouteEvent(vertexId, Collections.unmodifiableList(otherEvents)));
-      }
-      taskHeartbeatHandler.pinged(taskAttemptID);
-      eventInfo = context
-          .getCurrentDAG()
-          .getVertex(taskAttemptID.getTaskID().getVertexID())
-          .getTaskAttemptTezEvents(taskAttemptID, request.getStartIndex(), request.getPreRoutedStartIndex(),
-              request.getMaxEvents());
-    }
-    return new TaskHeartbeatResponse(false, eventInfo.getEvents(), eventInfo.getNextFromEventId(), eventInfo.getNextPreRoutedFromEventId());
-  }
-  public void taskAlive(TezTaskAttemptID taskAttemptId) {
-    taskHeartbeatHandler.pinged(taskAttemptId);
-  }
-
-  public void containerAlive(ContainerId containerId) {
-    pingContainerHeartbeatHandler(containerId);
-  }
-
-  public void taskStartedRemotely(TezTaskAttemptID taskAttemptID, ContainerId containerId) {
-    context.getEventHandler()
-        .handle(new TaskAttemptEventStartedRemotely(taskAttemptID, containerId, null));
-    pingContainerHeartbeatHandler(containerId);
-  }
-
-  public void taskKilled(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason,
-                         String diagnostics) {
-    // Regular flow via TaskAttempt will take care of un-registering from the heartbeat handler,
-    // and messages from the scheduler will release the container.
-    // TODO TEZ-2003 (post) TEZ-2671 Maybe consider un-registering here itself, since the task is not active anymore,
-    // instead of waiting for the unregister to flow through the Container.
-    // Fix along the same lines as TEZ-2124 by introducing an explict context.
-    context.getEventHandler().handle(new TaskAttemptEventAttemptKilled(taskAttemptId,
-        diagnostics, TezUtilsInternal.fromTaskAttemptEndReason(
-        taskAttemptEndReason)));
-  }
-
-  public void taskFailed(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason,
-                         String diagnostics) {
-    // Regular flow via TaskAttempt will take care of un-registering from the heartbeat handler,
-    // and messages from the scheduler will release the container.
-    // TODO TEZ-2003 (post) TEZ-2671 Maybe consider un-registering here itself, since the task is not active anymore,
-    // instead of waiting for the unregister to flow through the Container.
-    // Fix along the same lines as TEZ-2124 by introducing an explict context.
-    context.getEventHandler().handle(new TaskAttemptEventAttemptFailed(taskAttemptId,
-        TaskAttemptEventType.TA_FAILED, diagnostics, TezUtilsInternal.fromTaskAttemptEndReason(
-        taskAttemptEndReason)));
-  }
-
-  public void vertexStateUpdateNotificationReceived(VertexStateUpdate event, int taskCommIndex) throws
-      Exception {
-    taskCommunicators[taskCommIndex].onVertexStateUpdated(event);
-  }
-
-
-  /**
-   * Child checking whether it can commit.
-   * <p/>
-   * <br/>
-   * Repeatedly polls the ApplicationMaster whether it
-   * {@link Task#canCommit(TezTaskAttemptID)} This is * a legacy from the
-   * centralized commit protocol handling by the JobTracker.
-   */
-//  @Override
-  public boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException {
-    LOG.info("Commit go/no-go request from " + taskAttemptId.toString());
-    // An attempt is asking if it can commit its output. This can be decided
-    // only by the task which is managing the multiple attempts. So redirect the
-    // request there.
-    taskHeartbeatHandler.progressing(taskAttemptId);
-    pingContainerHeartbeatHandler(taskAttemptId);
-
-    DAG job = context.getCurrentDAG();
-    Task task =
-        job.getVertex(taskAttemptId.getTaskID().getVertexID()).
-            getTask(taskAttemptId.getTaskID());
-    return task.canCommit(taskAttemptId);
-  }
-
-  // The TaskAttemptListener register / unregister methods in this class are not thread safe.
-  // The Tez framework should not invoke these methods from multiple threads.
-  @Override
-  public void dagComplete(DAG dag) {
-    // TODO TEZ-2335. Cleanup TaskHeartbeat handler structures.
-    // TODO TEZ-2345. Also cleanup attemptInfo map, so that any tasks which heartbeat are told to die.
-    // Container structures remain unchanged - since they could be re-used across restarts.
-    // This becomes more relevant when task kills without container kills are allowed.
-
-    // TODO TEZ-2336. Send a signal to containers indicating DAG completion.
-
-    // Inform all communicators of the dagCompletion.
-    for (int i = 0 ; i < taskCommunicators.length ; i++) {
-      ((TaskCommunicatorContextImpl)taskCommunicatorContexts[i]).dagCompleteStart(dag);
-      taskCommunicators[i].dagComplete(dag.getName());
-      ((TaskCommunicatorContextImpl)taskCommunicatorContexts[i]).dagCompleteEnd();
-    }
-
-  }
-
-  @Override
-  public void dagSubmitted() {
-    // Nothing to do right now. Indicates that a new DAG has been submitted and
-    // the context has updated information.
-  }
-
-  @Override
-  public void registerRunningContainer(ContainerId containerId, int taskCommId) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("ContainerId: " + containerId + " registered with TaskAttemptListener");
-    }
-    ContainerInfo oldInfo = registeredContainers.put(containerId, NULL_CONTAINER_INFO);
-    if (oldInfo != null) {
-      throw new TezUncheckedException(
-          "Multiple registrations for containerId: " + containerId);
-    }
-    NodeId nodeId = context.getAllContainers().get(containerId).getContainer().getNodeId();
-    taskCommunicators[taskCommId].registerRunningContainer(containerId, nodeId.getHost(),
-        nodeId.getPort());
-  }
-
-  @Override
-  public void unregisterRunningContainer(ContainerId containerId, int taskCommId, ContainerEndReason endReason, String diagnostics) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Unregistering Container from TaskAttemptListener: " + containerId);
-    }
-    ContainerInfo containerInfo = registeredContainers.remove(containerId);
-    if (containerInfo.taskAttemptId != null) {
-      registeredAttempts.remove(containerInfo.taskAttemptId);
-    }
-    taskCommunicators[taskCommId].registerContainerEnd(containerId, endReason, diagnostics);
-  }
-
-  @Override
-  public void registerTaskAttempt(AMContainerTask amContainerTask,
-                                  ContainerId containerId, int taskCommId) {
-    ContainerInfo containerInfo = registeredContainers.get(containerId);
-    if (containerInfo == null) {
-      throw new TezUncheckedException("Registering task attempt: "
-          + amContainerTask.getTask().getTaskAttemptID() + " to unknown container: " + containerId);
-    }
-    if (containerInfo.taskAttemptId != null) {
-      throw new TezUncheckedException("Registering task attempt: "
-          + amContainerTask.getTask().getTaskAttemptID() + " to container: " + containerId
-          + " with existing assignment to: " +
-          containerInfo.taskAttemptId);
-    }
-
-    // Explicitly putting in a new entry so that synchronization is not required on the existing element in the map.
-    registeredContainers.put(containerId, new ContainerInfo(amContainerTask.getTask().getTaskAttemptID()));
-
-    ContainerId containerIdFromMap = registeredAttempts.put(
-        amContainerTask.getTask().getTaskAttemptID(), containerId);
-    if (containerIdFromMap != null) {
-      throw new TezUncheckedException("Registering task attempt: "
-          + amContainerTask.getTask().getTaskAttemptID() + " to container: " + containerId
-          + " when already assigned to: " + containerIdFromMap);
-    }
-    taskCommunicators[taskCommId].registerRunningTaskAttempt(containerId, amContainerTask.getTask(),
-        amContainerTask.getAdditionalResources(), amContainerTask.getCredentials(),
-        amContainerTask.haveCredentialsChanged(), amContainerTask.getPriority());
-  }
-
-  @Override
-  public void unregisterTaskAttempt(TezTaskAttemptID attemptId, int taskCommId, TaskAttemptEndReason endReason, String diagnostics) {
-    ContainerId containerId = registeredAttempts.remove(attemptId);
-    if (containerId == null) {
-      LOG.warn("Unregister task attempt: " + attemptId + " from unknown container");
-      return;
-    }
-    ContainerInfo containerInfo = registeredContainers.get(containerId);
-    if (containerInfo == null) {
-      LOG.warn("Unregister task attempt: " + attemptId +
-          " from non-registered container: " + containerId);
-      return;
-    }
-    // Explicitly putting in a new entry so that synchronization is not required on the existing element in the map.
-    registeredContainers.put(containerId, NULL_CONTAINER_INFO);
-    taskCommunicators[taskCommId].unregisterRunningTaskAttempt(attemptId, endReason, diagnostics);
-  }
-
-  @Override
-  public TaskCommunicator getTaskCommunicator(int taskCommIndex) {
-    return taskCommunicators[taskCommIndex];
-  }
-
-  private void pingContainerHeartbeatHandler(ContainerId containerId) {
-    containerHeartbeatHandler.pinged(containerId);
-  }
-
-  private void pingContainerHeartbeatHandler(TezTaskAttemptID taskAttemptId) {
-    ContainerId containerId = registeredAttempts.get(taskAttemptId);
-    if (containerId != null) {
-      containerHeartbeatHandler.pinged(containerId);
-    } else {
-      LOG.warn("Handling communication from attempt: " + taskAttemptId
-          + ", ContainerId not known for this attempt");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
index 9d57ac3..071b008 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
@@ -47,7 +47,7 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver
   // TODO TEZ-2003 (post) TEZ-2669 Propagate errors baack to the AM with proper error reporting
 
   private final AppContext context;
-  private final TaskAttemptListenerImpTezDag taskAttemptListener;
+  private final TaskCommunicatorManager taskCommunicatorManager;
   private final int taskCommunicatorIndex;
   private final ReentrantReadWriteLock.ReadLock dagChangedReadLock;
   private final ReentrantReadWriteLock.WriteLock dagChangedWriteLock;
@@ -56,11 +56,11 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver
   private DAG dag;
 
   public TaskCommunicatorContextImpl(AppContext appContext,
-                                     TaskAttemptListenerImpTezDag taskAttemptListener,
+                                     TaskCommunicatorManager taskCommunicatorManager,
                                      UserPayload userPayload,
                                      int taskCommunicatorIndex) {
     this.context = appContext;
-    this.taskAttemptListener = taskAttemptListener;
+    this.taskCommunicatorManager = taskCommunicatorManager;
     this.userPayload = userPayload;
     this.taskCommunicatorIndex = taskCommunicatorIndex;
 
@@ -86,13 +86,13 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver
 
   @Override
   public boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException {
-    return taskAttemptListener.canCommit(taskAttemptId);
+    return taskCommunicatorManager.canCommit(taskAttemptId);
   }
 
   @Override
   public TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request) throws IOException,
       TezException {
-    return taskAttemptListener.heartbeat(request);
+    return taskCommunicatorManager.heartbeat(request);
   }
 
   @Override
@@ -108,31 +108,31 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver
 
   @Override
   public void taskAlive(TezTaskAttemptID taskAttemptId) {
-    taskAttemptListener.taskAlive(taskAttemptId);
+    taskCommunicatorManager.taskAlive(taskAttemptId);
   }
 
   @Override
   public void containerAlive(ContainerId containerId) {
     if (isKnownContainer(containerId)) {
-      taskAttemptListener.containerAlive(containerId);
+      taskCommunicatorManager.containerAlive(containerId);
     }
   }
 
   @Override
   public void taskStartedRemotely(TezTaskAttemptID taskAttemptId, ContainerId containerId) {
-    taskAttemptListener.taskStartedRemotely(taskAttemptId, containerId);
+    taskCommunicatorManager.taskStartedRemotely(taskAttemptId, containerId);
   }
 
   @Override
   public void taskKilled(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason,
                          @Nullable String diagnostics) {
-    taskAttemptListener.taskKilled(taskAttemptId, taskAttemptEndReason, diagnostics);
+    taskCommunicatorManager.taskKilled(taskAttemptId, taskAttemptEndReason, diagnostics);
   }
 
   @Override
   public void taskFailed(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason,
                          @Nullable String diagnostics) {
-    taskAttemptListener.taskFailed(taskAttemptId, taskAttemptEndReason, diagnostics);
+    taskCommunicatorManager.taskFailed(taskAttemptId, taskAttemptEndReason, diagnostics);
 
   }
 
@@ -196,7 +196,7 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver
   @Override
   public void onStateUpdated(VertexStateUpdate event) {
     try {
-      taskAttemptListener.vertexStateUpdateNotificationReceived(event, taskCommunicatorIndex);
+      taskCommunicatorManager.vertexStateUpdateNotificationReceived(event, taskCommunicatorIndex);
     } catch (Exception e) {
       throw new TezUncheckedException(e);
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
new file mode 100644
index 0000000..42df259
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
@@ -0,0 +1,449 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.dag.app;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.commons.collections4.ListUtils;
+import org.apache.tez.dag.api.NamedEntityDescriptor;
+import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
+import org.apache.tez.serviceplugins.api.ContainerEndReason;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
+import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
+import org.apache.tez.runtime.api.impl.EventType;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.dag.api.TaskCommunicator;
+import org.apache.tez.dag.api.TaskCommunicatorContext;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
+import org.apache.tez.dag.api.TaskHeartbeatResponse;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.dag.api.TaskHeartbeatRequest;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.Task;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptKilled;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
+import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
+import org.apache.tez.dag.app.rm.container.AMContainerTask;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.impl.TezEvent;
+
+
+@SuppressWarnings("unchecked")
+@InterfaceAudience.Private
+public class TaskCommunicatorManager extends AbstractService implements
+    TaskCommunicatorManagerInterface {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(TaskCommunicatorManager.class);
+
+  private final AppContext context;
+  private final TaskCommunicator[] taskCommunicators;
+  private final TaskCommunicatorContext[] taskCommunicatorContexts;
+  protected final ServicePluginLifecycleAbstractService []taskCommunicatorServiceWrappers;
+
+  protected final TaskHeartbeatHandler taskHeartbeatHandler;
+  protected final ContainerHeartbeatHandler containerHeartbeatHandler;
+
+  private final TaskHeartbeatResponse RESPONSE_SHOULD_DIE = new TaskHeartbeatResponse(true, null, 0, 0);
+
+  private final ConcurrentMap<TezTaskAttemptID, ContainerId> registeredAttempts =
+      new ConcurrentHashMap<TezTaskAttemptID, ContainerId>();
+  private final ConcurrentMap<ContainerId, ContainerInfo> registeredContainers =
+      new ConcurrentHashMap<ContainerId, ContainerInfo>();
+
+  // Defined primarily to work around ConcurrentMaps not accepting null values
+  private static final class ContainerInfo {
+    TezTaskAttemptID taskAttemptId;
+    ContainerInfo(TezTaskAttemptID taskAttemptId) {
+      this.taskAttemptId = taskAttemptId;
+    }
+  }
+
+  private static final ContainerInfo NULL_CONTAINER_INFO = new ContainerInfo(null);
+
+
+  public TaskCommunicatorManager(AppContext context,
+                                 TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh,
+                                 List<NamedEntityDescriptor> taskCommunicatorDescriptors) {
+    super(TaskCommunicatorManager.class.getName());
+    this.context = context;
+    this.taskHeartbeatHandler = thh;
+    this.containerHeartbeatHandler = chh;
+    Preconditions.checkArgument(
+        taskCommunicatorDescriptors != null && !taskCommunicatorDescriptors.isEmpty(),
+        "TaskCommunicators must be specified");
+    this.taskCommunicators = new TaskCommunicator[taskCommunicatorDescriptors.size()];
+    this.taskCommunicatorContexts = new TaskCommunicatorContext[taskCommunicatorDescriptors.size()];
+    this.taskCommunicatorServiceWrappers = new ServicePluginLifecycleAbstractService[taskCommunicatorDescriptors.size()];
+    for (int i = 0 ; i < taskCommunicatorDescriptors.size() ; i++) {
+      UserPayload userPayload = taskCommunicatorDescriptors.get(i).getUserPayload();
+      taskCommunicatorContexts[i] = new TaskCommunicatorContextImpl(context, this, userPayload, i);
+      taskCommunicators[i] = createTaskCommunicator(taskCommunicatorDescriptors.get(i), i);
+      taskCommunicatorServiceWrappers[i] = new ServicePluginLifecycleAbstractService(taskCommunicators[i]);
+    }
+    // TODO TEZ-2118 Start using taskCommunicator indices properly
+  }
+
+  @Override
+  public void serviceStart() {
+    // TODO Why is init tied to serviceStart
+    for (int i = 0 ; i < taskCommunicators.length ; i++) {
+      taskCommunicatorServiceWrappers[i].init(getConfig());
+      taskCommunicatorServiceWrappers[i].start();
+    }
+  }
+
+  @Override
+  public void serviceStop() {
+    for (int i = 0 ; i < taskCommunicators.length ; i++) {
+      taskCommunicatorServiceWrappers[i].stop();
+    }
+  }
+
+  @VisibleForTesting
+  TaskCommunicator createTaskCommunicator(NamedEntityDescriptor taskCommDescriptor,
+                                          int taskCommIndex) {
+    if (taskCommDescriptor.getEntityName().equals(TezConstants.getTezYarnServicePluginName())) {
+      return createDefaultTaskCommunicator(taskCommunicatorContexts[taskCommIndex]);
+    } else if (taskCommDescriptor.getEntityName()
+        .equals(TezConstants.getTezUberServicePluginName())) {
+      return createUberTaskCommunicator(taskCommunicatorContexts[taskCommIndex]);
+    } else {
+      return createCustomTaskCommunicator(taskCommunicatorContexts[taskCommIndex],
+          taskCommDescriptor);
+    }
+  }
+
+  @VisibleForTesting
+  TaskCommunicator createDefaultTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext) {
+    LOG.info("Using Default Task Communicator");
+    return new TezTaskCommunicatorImpl(taskCommunicatorContext);
+  }
+
+  @VisibleForTesting
+  TaskCommunicator createUberTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext) {
+    LOG.info("Using Default Local Task Communicator");
+    return new TezLocalTaskCommunicatorImpl(taskCommunicatorContext);
+  }
+
+  @VisibleForTesting
+  TaskCommunicator createCustomTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext,
+                                                NamedEntityDescriptor taskCommDescriptor) {
+    LOG.info("Using TaskCommunicator {}:{} " + taskCommDescriptor.getEntityName(),
+        taskCommDescriptor.getClassName());
+    Class<? extends TaskCommunicator> taskCommClazz =
+        (Class<? extends TaskCommunicator>) ReflectionUtils
+            .getClazz(taskCommDescriptor.getClassName());
+    try {
+      Constructor<? extends TaskCommunicator> ctor =
+          taskCommClazz.getConstructor(TaskCommunicatorContext.class);
+      return ctor.newInstance(taskCommunicatorContext);
+    } catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) {
+      throw new TezUncheckedException(e);
+    }
+  }
+
+  public TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request)
+      throws IOException, TezException {
+    ContainerId containerId = ConverterUtils.toContainerId(request
+        .getContainerIdentifier());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Received heartbeat from container"
+          + ", request=" + request);
+    }
+
+    if (!registeredContainers.containsKey(containerId)) {
+      LOG.warn("Received task heartbeat from unknown container with id: " + containerId +
+          ", asking it to die");
+      return RESPONSE_SHOULD_DIE;
+    }
+
+    // A heartbeat can come in anytime. The AM may have made a decision to kill a running task/container
+    // meanwhile. If the decision is processed through the pipeline before the heartbeat is processed,
+    // the heartbeat will be dropped. Otherwise the heartbeat will be processed - and the system
+    // know how to handle this - via FailedInputEvents for example (relevant only if the heartbeat has events).
+    // So - avoiding synchronization.
+
+    pingContainerHeartbeatHandler(containerId);
+    TaskAttemptEventInfo eventInfo = new TaskAttemptEventInfo(0, null, 0);
+    TezTaskAttemptID taskAttemptID = request.getTaskAttemptId();
+    if (taskAttemptID != null) {
+      ContainerId containerIdFromMap = registeredAttempts.get(taskAttemptID);
+      if (containerIdFromMap == null || !containerIdFromMap.equals(containerId)) {
+        // This can happen when a task heartbeats. Meanwhile the container is unregistered.
+        // The information will eventually make it through to the plugin via a corresponding unregister.
+        // There's a race in that case between the unregister making it through, and this method returning.
+        // TODO TEZ-2003 (post) TEZ-2666. An exception back is likely a better approach than sending a shouldDie = true,
+        // so that the plugin can handle the scenario. Alternately augment the response with error codes.
+        // Error codes would be better than exceptions.
+        LOG.info("Attempt: " + taskAttemptID + " is not recognized for heartbeats");
+        return RESPONSE_SHOULD_DIE;
+      }
+
+      List<TezEvent> inEvents = request.getEvents();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Ping from " + taskAttemptID.toString() +
+            " events: " + (inEvents != null ? inEvents.size() : -1));
+      }
+
+      long currTime = context.getClock().getTime();
+      List<TezEvent> otherEvents = new ArrayList<TezEvent>();
+      // route TASK_STATUS_UPDATE_EVENT directly to TaskAttempt and route other events
+      // (DATA_MOVEMENT_EVENT, TASK_ATTEMPT_COMPLETED_EVENT, TASK_ATTEMPT_FAILED_EVENT)
+      // to VertexImpl to ensure the events ordering
+      //  1. DataMovementEvent is logged as RecoveryEvent before TaskAttemptFinishedEvent
+      //  2. TaskStatusEvent is handled before TaskAttemptFinishedEvent
+      for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) {
+        // for now, set the event time on the AM when it is received.
+        // this avoids any time disparity between machines.
+        tezEvent.setEventReceivedTime(currTime);
+        final EventType eventType = tezEvent.getEventType();
+        if (eventType == EventType.TASK_STATUS_UPDATE_EVENT) {
+          TaskAttemptEvent taskAttemptEvent = new TaskAttemptEventStatusUpdate(taskAttemptID,
+              (TaskStatusUpdateEvent) tezEvent.getEvent());
+          context.getEventHandler().handle(taskAttemptEvent);
+        } else {
+          otherEvents.add(tezEvent);
+        }
+      }
+      if(!otherEvents.isEmpty()) {
+        TezVertexID vertexId = taskAttemptID.getTaskID().getVertexID();
+        context.getEventHandler().handle(
+            new VertexEventRouteEvent(vertexId, Collections.unmodifiableList(otherEvents)));
+      }
+      taskHeartbeatHandler.pinged(taskAttemptID);
+      eventInfo = context
+          .getCurrentDAG()
+          .getVertex(taskAttemptID.getTaskID().getVertexID())
+          .getTaskAttemptTezEvents(taskAttemptID, request.getStartIndex(), request.getPreRoutedStartIndex(),
+              request.getMaxEvents());
+    }
+    return new TaskHeartbeatResponse(false, eventInfo.getEvents(), eventInfo.getNextFromEventId(), eventInfo.getNextPreRoutedFromEventId());
+  }
+  public void taskAlive(TezTaskAttemptID taskAttemptId) {
+    taskHeartbeatHandler.pinged(taskAttemptId);
+  }
+
+  public void containerAlive(ContainerId containerId) {
+    pingContainerHeartbeatHandler(containerId);
+  }
+
+  public void taskStartedRemotely(TezTaskAttemptID taskAttemptID, ContainerId containerId) {
+    context.getEventHandler()
+        .handle(new TaskAttemptEventStartedRemotely(taskAttemptID, containerId, null));
+    pingContainerHeartbeatHandler(containerId);
+  }
+
+  public void taskKilled(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason,
+                         String diagnostics) {
+    // Regular flow via TaskAttempt will take care of un-registering from the heartbeat handler,
+    // and messages from the scheduler will release the container.
+    // TODO TEZ-2003 (post) TEZ-2671 Maybe consider un-registering here itself, since the task is not active anymore,
+    // instead of waiting for the unregister to flow through the Container.
+    // Fix along the same lines as TEZ-2124 by introducing an explict context.
+    context.getEventHandler().handle(new TaskAttemptEventAttemptKilled(taskAttemptId,
+        diagnostics, TezUtilsInternal.fromTaskAttemptEndReason(
+        taskAttemptEndReason)));
+  }
+
+  public void taskFailed(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason,
+                         String diagnostics) {
+    // Regular flow via TaskAttempt will take care of un-registering from the heartbeat handler,
+    // and messages from the scheduler will release the container.
+    // TODO TEZ-2003 (post) TEZ-2671 Maybe consider un-registering here itself, since the task is not active anymore,
+    // instead of waiting for the unregister to flow through the Container.
+    // Fix along the same lines as TEZ-2124 by introducing an explict context.
+    context.getEventHandler().handle(new TaskAttemptEventAttemptFailed(taskAttemptId,
+        TaskAttemptEventType.TA_FAILED, diagnostics, TezUtilsInternal.fromTaskAttemptEndReason(
+        taskAttemptEndReason)));
+  }
+
+  public void vertexStateUpdateNotificationReceived(VertexStateUpdate event, int taskCommIndex) throws
+      Exception {
+    taskCommunicators[taskCommIndex].onVertexStateUpdated(event);
+  }
+
+
+  /**
+   * Child checking whether it can commit.
+   * <p/>
+   * <br/>
+   * Repeatedly polls the ApplicationMaster whether it
+   * {@link Task#canCommit(TezTaskAttemptID)} This is * a legacy from the
+   * centralized commit protocol handling by the JobTracker.
+   */
+//  @Override
+  public boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException {
+    LOG.info("Commit go/no-go request from " + taskAttemptId.toString());
+    // An attempt is asking if it can commit its output. This can be decided
+    // only by the task which is managing the multiple attempts. So redirect the
+    // request there.
+    taskHeartbeatHandler.progressing(taskAttemptId);
+    pingContainerHeartbeatHandler(taskAttemptId);
+
+    DAG job = context.getCurrentDAG();
+    Task task =
+        job.getVertex(taskAttemptId.getTaskID().getVertexID()).
+            getTask(taskAttemptId.getTaskID());
+    return task.canCommit(taskAttemptId);
+  }
+
+  // The TaskAttemptListener register / unregister methods in this class are not thread safe.
+  // The Tez framework should not invoke these methods from multiple threads.
+  @Override
+  public void dagComplete(DAG dag) {
+    // TODO TEZ-2335. Cleanup TaskHeartbeat handler structures.
+    // TODO TEZ-2345. Also cleanup attemptInfo map, so that any tasks which heartbeat are told to die.
+    // Container structures remain unchanged - since they could be re-used across restarts.
+    // This becomes more relevant when task kills without container kills are allowed.
+
+    // TODO TEZ-2336. Send a signal to containers indicating DAG completion.
+
+    // Inform all communicators of the dagCompletion.
+    for (int i = 0 ; i < taskCommunicators.length ; i++) {
+      ((TaskCommunicatorContextImpl)taskCommunicatorContexts[i]).dagCompleteStart(dag);
+      taskCommunicators[i].dagComplete(dag.getName());
+      ((TaskCommunicatorContextImpl)taskCommunicatorContexts[i]).dagCompleteEnd();
+    }
+
+  }
+
+  @Override
+  public void dagSubmitted() {
+    // Nothing to do right now. Indicates that a new DAG has been submitted and
+    // the context has updated information.
+  }
+
+  @Override
+  public void registerRunningContainer(ContainerId containerId, int taskCommId) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("ContainerId: " + containerId + " registered with TaskAttemptListener");
+    }
+    ContainerInfo oldInfo = registeredContainers.put(containerId, NULL_CONTAINER_INFO);
+    if (oldInfo != null) {
+      throw new TezUncheckedException(
+          "Multiple registrations for containerId: " + containerId);
+    }
+    NodeId nodeId = context.getAllContainers().get(containerId).getContainer().getNodeId();
+    taskCommunicators[taskCommId].registerRunningContainer(containerId, nodeId.getHost(),
+        nodeId.getPort());
+  }
+
+  @Override
+  public void unregisterRunningContainer(ContainerId containerId, int taskCommId, ContainerEndReason endReason, String diagnostics) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Unregistering Container from TaskAttemptListener: " + containerId);
+    }
+    ContainerInfo containerInfo = registeredContainers.remove(containerId);
+    if (containerInfo.taskAttemptId != null) {
+      registeredAttempts.remove(containerInfo.taskAttemptId);
+    }
+    taskCommunicators[taskCommId].registerContainerEnd(containerId, endReason, diagnostics);
+  }
+
+  @Override
+  public void registerTaskAttempt(AMContainerTask amContainerTask,
+                                  ContainerId containerId, int taskCommId) {
+    ContainerInfo containerInfo = registeredContainers.get(containerId);
+    if (containerInfo == null) {
+      throw new TezUncheckedException("Registering task attempt: "
+          + amContainerTask.getTask().getTaskAttemptID() + " to unknown container: " + containerId);
+    }
+    if (containerInfo.taskAttemptId != null) {
+      throw new TezUncheckedException("Registering task attempt: "
+          + amContainerTask.getTask().getTaskAttemptID() + " to container: " + containerId
+          + " with existing assignment to: " +
+          containerInfo.taskAttemptId);
+    }
+
+    // Explicitly putting in a new entry so that synchronization is not required on the existing element in the map.
+    registeredContainers.put(containerId, new ContainerInfo(amContainerTask.getTask().getTaskAttemptID()));
+
+    ContainerId containerIdFromMap = registeredAttempts.put(
+        amContainerTask.getTask().getTaskAttemptID(), containerId);
+    if (containerIdFromMap != null) {
+      throw new TezUncheckedException("Registering task attempt: "
+          + amContainerTask.getTask().getTaskAttemptID() + " to container: " + containerId
+          + " when already assigned to: " + containerIdFromMap);
+    }
+    taskCommunicators[taskCommId].registerRunningTaskAttempt(containerId, amContainerTask.getTask(),
+        amContainerTask.getAdditionalResources(), amContainerTask.getCredentials(),
+        amContainerTask.haveCredentialsChanged(), amContainerTask.getPriority());
+  }
+
+  @Override
+  public void unregisterTaskAttempt(TezTaskAttemptID attemptId, int taskCommId, TaskAttemptEndReason endReason, String diagnostics) {
+    ContainerId containerId = registeredAttempts.remove(attemptId);
+    if (containerId == null) {
+      LOG.warn("Unregister task attempt: " + attemptId + " from unknown container");
+      return;
+    }
+    ContainerInfo containerInfo = registeredContainers.get(containerId);
+    if (containerInfo == null) {
+      LOG.warn("Unregister task attempt: " + attemptId +
+          " from non-registered container: " + containerId);
+      return;
+    }
+    // Explicitly putting in a new entry so that synchronization is not required on the existing element in the map.
+    registeredContainers.put(containerId, NULL_CONTAINER_INFO);
+    taskCommunicators[taskCommId].unregisterRunningTaskAttempt(attemptId, endReason, diagnostics);
+  }
+
+  @Override
+  public TaskCommunicator getTaskCommunicator(int taskCommIndex) {
+    return taskCommunicators[taskCommIndex];
+  }
+
+  private void pingContainerHeartbeatHandler(ContainerId containerId) {
+    containerHeartbeatHandler.pinged(containerId);
+  }
+
+  private void pingContainerHeartbeatHandler(TezTaskAttemptID taskAttemptId) {
+    ContainerId containerId = registeredAttempts.get(taskAttemptId);
+    if (containerId != null) {
+      containerHeartbeatHandler.pinged(containerId);
+    } else {
+      LOG.warn("Handling communication from attempt: " + taskAttemptId
+          + ", ContainerId not known for this attempt");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java
new file mode 100644
index 0000000..8d060a2
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java
@@ -0,0 +1,46 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.serviceplugins.api.ContainerEndReason;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.api.TaskCommunicator;
+import org.apache.tez.dag.app.rm.container.AMContainerTask;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+/**
+ * This class listens for changes to the state of a Task.
+ */
+public interface TaskCommunicatorManagerInterface {
+
+  void registerRunningContainer(ContainerId containerId, int taskCommId);
+
+  void registerTaskAttempt(AMContainerTask amContainerTask, ContainerId containerId, int taskCommId);
+  
+  void unregisterRunningContainer(ContainerId containerId, int taskCommId, ContainerEndReason endReason, String diagnostics);
+  
+  void unregisterTaskAttempt(TezTaskAttemptID attemptID, int taskCommId, TaskAttemptEndReason endReason, String diagnostics);
+
+  void dagComplete(DAG dag);
+
+  void dagSubmitted();
+
+  TaskCommunicator getTaskCommunicator(int taskCommIndex);
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 6b474ff..756ed28 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -82,7 +82,7 @@ import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
 import org.apache.tez.dag.api.records.DAGProtos.PlanVertexGroupInfo;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.DAGReport;
@@ -160,7 +160,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   private final Lock readLock;
   private final Lock writeLock;
   private final String dagName;
-  private final TaskAttemptListener taskAttemptListener;
+  private final TaskCommunicatorManagerInterface taskCommunicatorManagerInterface;
   private final TaskHeartbeatHandler taskHeartbeatHandler;
   private final Object tasksSyncHandle = new Object();
 
@@ -489,7 +489,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       Configuration amConf,
       DAGPlan jobPlan,
       EventHandler eventHandler,
-      TaskAttemptListener taskAttemptListener,
+      TaskCommunicatorManagerInterface taskCommunicatorManagerInterface,
       Credentials dagCredentials,
       Clock clock,
       String appUserName,
@@ -511,7 +511,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     this.clock = clock;
     this.appContext = appContext;
 
-    this.taskAttemptListener = taskAttemptListener;
+    this.taskCommunicatorManagerInterface = taskCommunicatorManagerInterface;
     this.taskHeartbeatHandler = thh;
     this.eventHandler = eventHandler;
     ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
@@ -1538,7 +1538,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
 
     VertexImpl v = new VertexImpl(
         vertexId, vertexPlan, vertexName, dag.dagConf,
-        dag.eventHandler, dag.taskAttemptListener,
+        dag.eventHandler, dag.taskCommunicatorManagerInterface,
         dag.clock, dag.taskHeartbeatHandler,
         !dag.commitAllOutputsOnSuccess, dag.appContext, vertexLocationHint,
         dag.vertexGroups, dag.taskSpecificLaunchCmdOption, dag.entityUpdateTracker);

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 6957b1d..3f2e3a4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -61,7 +61,7 @@ import org.apache.tez.dag.api.oldrecords.TaskAttemptReport;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerContext;
-import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.TaskAttempt;
@@ -403,17 +403,17 @@ public class TaskAttemptImpl implements TaskAttempt,
 
   @SuppressWarnings("rawtypes")
   public TaskAttemptImpl(TezTaskID taskId, int attemptNumber, EventHandler eventHandler,
-      TaskAttemptListener taskAttemptListener, Configuration conf, Clock clock,
+      TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, Configuration conf, Clock clock,
       TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
       boolean isRescheduled,
       Resource resource, ContainerContext containerContext, boolean leafVertex,
       Task task) {
-    this(taskId, attemptNumber, eventHandler, taskAttemptListener, conf, clock,
+    this(taskId, attemptNumber, eventHandler, taskCommunicatorManagerInterface, conf, clock,
         taskHeartbeatHandler, appContext, isRescheduled, resource, containerContext, leafVertex,
         task, null);
   }
   public TaskAttemptImpl(TezTaskID taskId, int attemptNumber, EventHandler eventHandler,
-      TaskAttemptListener taskAttemptListener, Configuration conf, Clock clock,
+      TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, Configuration conf, Clock clock,
       TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
       boolean isRescheduled,
       Resource resource, ContainerContext containerContext, boolean leafVertex,

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 1b55295..ea14483 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -56,7 +56,7 @@ import org.apache.tez.dag.api.oldrecords.TaskReport;
 import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerContext;
-import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.StateChangeNotifier;
 import org.apache.tez.dag.app.dag.Task;
@@ -115,7 +115,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     .getProperty("line.separator");
 
   protected final Configuration conf;
-  protected final TaskAttemptListener taskAttemptListener;
+  protected final TaskCommunicatorManagerInterface taskCommunicatorManagerInterface;
   protected final TaskHeartbeatHandler taskHeartbeatHandler;
   protected final EventHandler eventHandler;
   private final TezTaskID taskId;
@@ -341,7 +341,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
 
   public TaskImpl(TezVertexID vertexId, int taskIndex,
       EventHandler eventHandler, Configuration conf,
-      TaskAttemptListener taskAttemptListener,
+      TaskCommunicatorManagerInterface taskCommunicatorManagerInterface,
       Clock clock, TaskHeartbeatHandler thh, AppContext appContext,
       boolean leafVertex, Resource resource,
       ContainerContext containerContext,
@@ -357,7 +357,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     maxFailedAttempts = this.conf.getInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS,
                               TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS_DEFAULT);
     taskId = TezTaskID.getInstance(vertexId, taskIndex);
-    this.taskAttemptListener = taskAttemptListener;
+    this.taskCommunicatorManagerInterface = taskCommunicatorManagerInterface;
     this.taskHeartbeatHandler = thh;
     this.eventHandler = eventHandler;
     this.appContext = appContext;
@@ -817,7 +817,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   
   TaskAttemptImpl createAttempt(int attemptNumber, TezTaskAttemptID schedulingCausalTA) {
     return new TaskAttemptImpl(getTaskId(), attemptNumber, eventHandler,
-        taskAttemptListener, conf, clock, taskHeartbeatHandler, appContext,
+        taskCommunicatorManagerInterface, conf, clock, taskHeartbeatHandler, appContext,
         (failedAttempts > 0), taskResource, containerContext, leafVertex, this, schedulingCausalTA);
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 3cc439f..a1dcf6c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -94,7 +94,7 @@ import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerContext;
 import org.apache.tez.dag.app.TaskAttemptEventInfo;
-import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.RootInputInitializerManager;
@@ -212,7 +212,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
 
   private final Lock readLock;
   private final Lock writeLock;
-  private final TaskAttemptListener taskAttemptListener;
+  private final TaskCommunicatorManagerInterface taskCommunicatorManagerInterface;
   private final TaskHeartbeatHandler taskHeartbeatHandler;
   private final Object tasksSyncHandle = new Object();
 
@@ -890,7 +890,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
 
   public VertexImpl(TezVertexID vertexId, VertexPlan vertexPlan,
       String vertexName, Configuration dagConf, EventHandler eventHandler,
-      TaskAttemptListener taskAttemptListener, Clock clock,
+      TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, Clock clock,
       TaskHeartbeatHandler thh, boolean commitVertexOutputs,
       AppContext appContext, VertexLocationHint vertexLocationHint,
       Map<String, VertexGroupInfo> dagVertexGroups, TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOption,
@@ -911,7 +911,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
     this.appContext = appContext;
     this.commitVertexOutputs = commitVertexOutputs;
 
-    this.taskAttemptListener = taskAttemptListener;
+    this.taskCommunicatorManagerInterface = taskCommunicatorManagerInterface;
     this.taskHeartbeatHandler = thh;
     this.eventHandler = eventHandler;
     ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
@@ -2331,7 +2331,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
     return new TaskImpl(this.getVertexId(), taskIndex,
         this.eventHandler,
         vertexConf,
-        this.taskAttemptListener,
+        this.taskCommunicatorManagerInterface,
         this.clock,
         this.taskHeartbeatHandler,
         this.appContext,