You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/06 09:41:39 UTC

[37/50] [abbrv] tez git commit: TEZ-2123. Fix component managers to use pluggable components. Enable hybrid mode. (sseth)

TEZ-2123. Fix component managers to use pluggable components. Enable
hybrid mode. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/25980c1a
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/25980c1a
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/25980c1a

Branch: refs/heads/TEZ-2003
Commit: 25980c1a70a05fb915e89d246643dc8549e793cc
Parents: adab48f
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Feb 20 11:59:03 2015 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed May 6 00:13:29 2015 -0700

----------------------------------------------------------------------
 TEZ-2003-CHANGES.txt                            |   1 +
 .../org/apache/tez/dag/app/DAGAppMaster.java    |   4 +-
 .../apache/tez/dag/app/TaskAttemptListener.java |  12 +-
 .../dag/app/TaskAttemptListenerImpTezDag.java   |  30 ++--
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |   4 +-
 .../TezRootInputInitializerContextImpl.java     |   2 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |   2 +-
 .../tez/dag/app/dag/impl/VertexManager.java     |   2 +-
 .../app/launcher/ContainerLauncherRouter.java   |   2 +-
 .../app/launcher/LocalContainerLauncher.java    |  10 +-
 .../rm/AMSchedulerEventDeallocateContainer.java |   7 +-
 .../rm/AMSchedulerEventNodeBlacklistUpdate.java |   8 +-
 .../tez/dag/app/rm/AMSchedulerEventTAEnded.java |  10 +-
 .../dag/app/rm/LocalTaskSchedulerService.java   |  19 ++-
 .../tez/dag/app/rm/NMCommunicatorEvent.java     |  12 +-
 .../rm/NMCommunicatorLaunchRequestEvent.java    |  11 +-
 .../app/rm/NMCommunicatorStopRequestEvent.java  |   4 +-
 .../dag/app/rm/TaskSchedulerEventHandler.java   | 151 ++++++++++++-----
 .../tez/dag/app/rm/container/AMContainer.java   |   3 +
 .../AMContainerEventLaunchRequest.java          |  15 +-
 .../dag/app/rm/container/AMContainerImpl.java   |  39 +++--
 .../dag/app/rm/container/AMContainerMap.java    |   4 +-
 .../apache/tez/dag/app/rm/node/AMNodeImpl.java  |   6 +-
 .../apache/tez/dag/app/MockDAGAppMaster.java    |   2 +-
 .../app/TestTaskAttemptListenerImplTezDag.java  |  31 ++--
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   |  69 ++++----
 .../tez/dag/app/dag/impl/TestVertexImpl.java    |   8 +-
 .../tez/dag/app/rm/TestContainerReuse.java      |  34 ++--
 .../tez/dag/app/rm/TestLocalTaskScheduler.java  |   2 +-
 .../app/rm/TestLocalTaskSchedulerService.java   |  18 ++-
 .../app/rm/TestTaskSchedulerEventHandler.java   |  11 +-
 .../dag/app/rm/TestTaskSchedulerHelpers.java    |   2 +-
 .../dag/app/rm/container/TestAMContainer.java   | 108 +++++++------
 .../app/rm/container/TestAMContainerMap.java    |   6 +-
 .../org/apache/tez/examples/JoinValidate.java   |  30 +++-
 .../TezTestServiceContainerLauncher.java        |   5 +-
 .../rm/TezTestServiceTaskSchedulerService.java  | 100 ++----------
 .../tez/examples/JoinValidateConfigured.java    |  53 ++++++
 .../tez/tests/TestExternalTezServices.java      | 160 ++++++++++++++-----
 39 files changed, 638 insertions(+), 359 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index 4bfe08f..1a2264c 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -4,5 +4,6 @@ ALL CHANGES:
   TEZ-2090. Add tests for jobs running in external services.
   TEZ-2117. Add a manager for ContainerLaunchers running in the AM.
   TEZ-2122. Setup pluggable components at AM/Vertex level.
+  TEZ-2123. Fix component managers to use pluggable components. (Enable hybrid mode)
 
 INCOMPATIBLE CHANGES:

http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 6814cda..89b6506 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -513,7 +513,7 @@ public class DAGAppMaster extends AbstractService {
 
     this.taskSchedulerEventHandler = new TaskSchedulerEventHandler(context,
         clientRpcServer, dispatcher.getEventHandler(), containerSignatureMatcher, webUIService,
-        taskSchedulerClassIdentifiers);
+        taskSchedulerClassIdentifiers, isLocal);
     addIfService(taskSchedulerEventHandler, true);
 
     if (enableWebUIService()) {
@@ -2283,6 +2283,7 @@ public class DAGAppMaster extends AbstractService {
   // Tez default classnames are populated as TezConfiguration.TEZ_AM_SERVICE_PLUGINS_DEFAULT
   private String[] parsePlugins(BiMap<String, Integer> pluginMap, String[] pluginStrings,
                                    String context) {
+    // TODO TEZ-2003 Duplicate error checking - ideally in the client itself. Depends on the final API.
     Preconditions.checkState(pluginStrings != null && pluginStrings.length > 0,
         "Plugin strings should not be null or empty: " + context);
 
@@ -2320,6 +2321,7 @@ public class DAGAppMaster extends AbstractService {
       }
       pluginMap.put(identifierString, index);
       classNames[index] = className;
+      index++;
     }
     return classNames;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
index 9caa7cf..e4dad27 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
@@ -22,6 +22,7 @@ import java.net.InetSocketAddress;
 
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.api.TaskCommunicator;
 import org.apache.tez.dag.app.rm.container.AMContainerTask;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 /**
@@ -29,18 +30,17 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
  */
 public interface TaskAttemptListener {
 
-  InetSocketAddress getAddress();
+  void registerRunningContainer(ContainerId containerId, int taskCommId);
 
-  void registerRunningContainer(ContainerId containerId);
-
-  void registerTaskAttempt(AMContainerTask amContainerTask, ContainerId containerId);
+  void registerTaskAttempt(AMContainerTask amContainerTask, ContainerId containerId, int taskCommId);
   
-  void unregisterRunningContainer(ContainerId containerId);
+  void unregisterRunningContainer(ContainerId containerId, int taskCommId);
   
-  void unregisterTaskAttempt(TezTaskAttemptID attemptID);
+  void unregisterTaskAttempt(TezTaskAttemptID attemptID, int taskCommId);
 
   void dagComplete(DAG dag);
 
   void dagSubmitted();
 
+  TaskCommunicator getTaskCommunicator(int taskCommIndex);
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index fc4d787..71b0d2a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -273,11 +273,6 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     return task.canCommit(taskAttemptId);
   }
 
-  @Override
-  public InetSocketAddress getAddress() {
-    return taskCommunicators[0].getAddress();
-  }
-
   // The TaskAttemptListener register / unregister methods in this class are not thread safe.
   // The Tez framework should not invoke these methods from multiple threads.
   @Override
@@ -297,7 +292,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
   }
 
   @Override
-  public void registerRunningContainer(ContainerId containerId) {
+  public void registerRunningContainer(ContainerId containerId, int taskCommId) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("ContainerId: " + containerId + " registered with TaskAttemptListener");
     }
@@ -307,11 +302,12 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
           "Multiple registrations for containerId: " + containerId);
     }
     NodeId nodeId = context.getAllContainers().get(containerId).getContainer().getNodeId();
-    taskCommunicators[0].registerRunningContainer(containerId, nodeId.getHost(), nodeId.getPort());
+    taskCommunicators[taskCommId].registerRunningContainer(containerId, nodeId.getHost(),
+        nodeId.getPort());
   }
 
   @Override
-  public void unregisterRunningContainer(ContainerId containerId) {
+  public void unregisterRunningContainer(ContainerId containerId, int taskCommId) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Unregistering Container from TaskAttemptListener: " + containerId);
     }
@@ -319,12 +315,12 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     if (containerInfo.taskAttemptId != null) {
       registeredAttempts.remove(containerInfo.taskAttemptId);
     }
-    taskCommunicators[0].registerContainerEnd(containerId);
+    taskCommunicators[taskCommId].registerContainerEnd(containerId);
   }
 
   @Override
   public void registerTaskAttempt(AMContainerTask amContainerTask,
-                                  ContainerId containerId) {
+                                  ContainerId containerId, int taskCommId) {
     ContainerInfo containerInfo = registeredContainers.get(containerId);
     if (containerInfo == null) {
       throw new TezUncheckedException("Registering task attempt: "
@@ -354,13 +350,13 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
           + amContainerTask.getTask().getTaskAttemptID() + " to container: " + containerId
           + " when already assigned to: " + containerIdFromMap);
     }
-    taskCommunicators[0].registerRunningTaskAttempt(containerId, amContainerTask.getTask(),
+    taskCommunicators[taskCommId].registerRunningTaskAttempt(containerId, amContainerTask.getTask(),
         amContainerTask.getAdditionalResources(), amContainerTask.getCredentials(),
         amContainerTask.haveCredentialsChanged());
   }
 
   @Override
-  public void unregisterTaskAttempt(TezTaskAttemptID attemptId) {
+  public void unregisterTaskAttempt(TezTaskAttemptID attemptId, int taskCommId) {
     ContainerId containerId = registeredAttempts.remove(attemptId);
     if (containerId == null) {
       LOG.warn("Unregister task attempt: " + attemptId + " from unknown container");
@@ -374,7 +370,12 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     }
     // Explicitly putting in a new entry so that synchronization is not required on the existing element in the map.
     registeredContainers.put(containerId, NULL_CONTAINER_INFO);
-    taskCommunicators[0].unregisterRunningTaskAttempt(attemptId);
+    taskCommunicators[taskCommId].unregisterRunningTaskAttempt(attemptId);
+  }
+
+  @Override
+  public TaskCommunicator getTaskCommunicator(int taskCommIndex) {
+    return taskCommunicators[taskCommIndex];
   }
 
   private void pingContainerHeartbeatHandler(ContainerId containerId) {
@@ -413,7 +414,4 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     return taskAttemptEvent;
   }
 
-  public TaskCommunicator getTaskCommunicator() {
-    return taskCommunicators[0];
-  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index c18dc00..c80571d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -1218,7 +1218,7 @@ public class TaskAttemptImpl implements TaskAttempt,
       // Inform the scheduler
       if (sendSchedulerEvent()) {
         ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.containerId, helper
-            .getTaskAttemptState()));
+            .getTaskAttemptState(), ta.getVertex().getTaskSchedulerIdentifier()));
       }
     }
   }
@@ -1300,7 +1300,7 @@ public class TaskAttemptImpl implements TaskAttempt,
 
       // Inform the Scheduler.
       ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.containerId,
-          TaskAttemptState.SUCCEEDED));
+          TaskAttemptState.SUCCEEDED, ta.getVertex().getTaskSchedulerIdentifier()));
 
       // Inform the task.
       ta.sendEvent(new TaskEventTAUpdate(ta.attemptId,

http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java
index d4ef4d5..4ca4024 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java
@@ -96,7 +96,7 @@ public class TezRootInputInitializerContextImpl implements
 
   @Override
   public Resource getTotalAvailableResource() {
-    return appContext.getTaskScheduler().getTotalResources();
+    return appContext.getTaskScheduler().getTotalResources(vertex.getTaskSchedulerIdentifier());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index ddf670f..81e1732 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -4207,7 +4207,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
         eventHandler, getTotalTasks(),
         appContext.getTaskScheduler().getNumClusterNodes(),
         getTaskResource(),
-        appContext.getTaskScheduler().getTotalResources());
+        appContext.getTaskScheduler().getTotalResources(taskSchedulerIdentifier));
     List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
         inputList = Lists.newArrayListWithCapacity(inputsWithInitializers.size());
     for (String inputName : inputsWithInitializers) {

http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index 945d9ba..1300fc0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -286,7 +286,7 @@ public class VertexManager {
     @Override
     public synchronized Resource getTotalAvailableResource() {
       checkAndThrowIfDone();
-      return appContext.getTaskScheduler().getTotalResources();
+      return appContext.getTaskScheduler().getTotalResources(managedVertex.getTaskSchedulerIdentifier());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
index 621e4a8..4f9b5bf 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
@@ -124,6 +124,6 @@ public class ContainerLauncherRouter extends AbstractService
 
   @Override
   public void handle(NMCommunicatorEvent event) {
-    containerLaunchers[0].handle(event);
+    containerLaunchers[event.getLauncherId()].handle(event);
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
index e9ba9d7..9a38732 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
@@ -59,7 +59,6 @@ import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.TaskAttemptListener;
-import org.apache.tez.dag.app.TaskAttemptListenerImpTezDag;
 import org.apache.tez.dag.app.TezTaskCommunicatorImpl;
 import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
 import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
@@ -88,9 +87,9 @@ public class LocalContainerLauncher extends AbstractService implements
 
   private static final Logger LOG = LoggerFactory.getLogger(LocalContainerLauncher.class);
   private final AppContext context;
-  private final TezTaskUmbilicalProtocol taskUmbilicalProtocol;
   private final AtomicBoolean serviceStopped = new AtomicBoolean(false);
   private final String workingDirectory;
+  private final TaskAttemptListener tal;
   private final Map<String, String> localEnv = new HashMap<String, String>();
   private final ExecutionContext executionContext;
   private int numExecutors;
@@ -116,9 +115,8 @@ public class LocalContainerLauncher extends AbstractService implements
                                 String workingDirectory) throws UnknownHostException {
     super(LocalContainerLauncher.class.getName());
     this.context = context;
-    TaskAttemptListenerImpTezDag taListener = (TaskAttemptListenerImpTezDag)taskAttemptListener;
-    TezTaskCommunicatorImpl taskComm = (TezTaskCommunicatorImpl) taListener.getTaskCommunicator();
-    this.taskUmbilicalProtocol = taskComm.getUmbilical();
+    this.tal = taskAttemptListener;
+
     this.workingDirectory = workingDirectory;
     AuxiliaryServiceHelper.setServiceDataIntoEnv(
         ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, ByteBuffer.allocate(4).putInt(0), localEnv);
@@ -219,7 +217,7 @@ public class LocalContainerLauncher extends AbstractService implements
         tezChild =
             createTezChild(context.getAMConf(), event.getContainerId(), tokenIdentifier,
                 context.getApplicationAttemptId().getAttemptId(), context.getLocalDirs(),
-                taskUmbilicalProtocol,
+                ((TezTaskCommunicatorImpl)tal.getTaskCommunicator(event.getTaskCommId())).getUmbilical(),
                 TezCommonUtils.parseCredentialsBytes(event.getContainerLaunchContext().getTokens().array()));
       } catch (InterruptedException e) {
         handleLaunchFailed(e, event.getContainerId());

http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventDeallocateContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventDeallocateContainer.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventDeallocateContainer.java
index 1b51920..5270aa2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventDeallocateContainer.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventDeallocateContainer.java
@@ -23,15 +23,20 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 public class AMSchedulerEventDeallocateContainer extends AMSchedulerEvent {
 
   private final ContainerId containerId;
+  private final int schedulerId;
   
-  public AMSchedulerEventDeallocateContainer(ContainerId containerId) {
+  public AMSchedulerEventDeallocateContainer(ContainerId containerId, int schedulerId) {
     super(AMSchedulerEventType.S_CONTAINER_DEALLOCATE);
     this.containerId = containerId;
+    this.schedulerId = schedulerId;
   }
   
   public ContainerId getContainerId() {
     return this.containerId;
   }
 
+  public int getSchedulerId() {
+    return schedulerId;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklistUpdate.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklistUpdate.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklistUpdate.java
index ed7ebc3..679705a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklistUpdate.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklistUpdate.java
@@ -23,14 +23,20 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 public class AMSchedulerEventNodeBlacklistUpdate extends AMSchedulerEvent {
 
   private final NodeId nodeId;
+  private final int schedulerId;
 
-  public AMSchedulerEventNodeBlacklistUpdate(NodeId nodeId, boolean add) {
+  public AMSchedulerEventNodeBlacklistUpdate(NodeId nodeId, boolean add, int schedulerId) {
     super((add ? AMSchedulerEventType.S_NODE_BLACKLISTED
         : AMSchedulerEventType.S_NODE_UNBLACKLISTED));
     this.nodeId = nodeId;
+    this.schedulerId = schedulerId;
   }
 
   public NodeId getNodeId() {
     return this.nodeId;
   }
+
+  public int getSchedulerId() {
+    return schedulerId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java
index 90e76b7..2ace642 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java
@@ -26,14 +26,16 @@ public class AMSchedulerEventTAEnded extends AMSchedulerEvent {
 
   private final TaskAttempt attempt;
   private final ContainerId containerId;
-  private TaskAttemptState state;
+  private final TaskAttemptState state;
+  private final int schedulerId;
 
   public AMSchedulerEventTAEnded(TaskAttempt attempt, ContainerId containerId,
-      TaskAttemptState state) {
+      TaskAttemptState state, int schedulerId) {
     super(AMSchedulerEventType.S_TA_ENDED);
     this.attempt = attempt;
     this.containerId = containerId;
     this.state = state;
+    this.schedulerId = schedulerId;
   }
 
   public TezTaskAttemptID getAttemptID() {
@@ -51,4 +53,8 @@ public class AMSchedulerEventTAEnded extends AMSchedulerEvent {
   public ContainerId getUsedContainerId() {
     return this.containerId;
   }
+
+  public int getSchedulerId() {
+    return schedulerId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
index 51d8b9d..72a074f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
@@ -34,6 +34,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -63,10 +64,11 @@ public class LocalTaskSchedulerService extends TaskSchedulerService {
   final int appHostPort;
   final String appTrackingUrl;
   final AppContext appContext;
+  final long customContainerAppId;
 
   public LocalTaskSchedulerService(TaskSchedulerAppCallback appClient,
       ContainerSignatureMatcher containerSignatureMatcher, String appHostName,
-      int appHostPort, String appTrackingUrl, AppContext appContext) {
+      int appHostPort, String appTrackingUrl, long customContainerAppId, AppContext appContext) {
     super(LocalTaskSchedulerService.class.getName());
     this.realAppClient = appClient;
     this.appCallbackExecutor = createAppCallbackExecutorService();
@@ -78,6 +80,7 @@ public class LocalTaskSchedulerService extends TaskSchedulerService {
     this.appContext = appContext;
     taskRequestQueue = new PriorityBlockingQueue<TaskRequest>();
     taskAllocations = new LinkedHashMap<Object, Container>();
+    this.customContainerAppId = customContainerAppId;
   }
 
   private ExecutorService createAppCallbackExecutorService() {
@@ -164,7 +167,7 @@ public class LocalTaskSchedulerService extends TaskSchedulerService {
 
   protected AsyncDelegateRequestHandler createRequestHandler(Configuration conf) {
     return new AsyncDelegateRequestHandler(taskRequestQueue,
-        new LocalContainerFactory(appContext),
+        new LocalContainerFactory(appContext, customContainerAppId),
         taskAllocations,
         appClientDelegate,
         conf);
@@ -195,17 +198,19 @@ public class LocalTaskSchedulerService extends TaskSchedulerService {
   }
 
   static class LocalContainerFactory {
-    final AppContext appContext;
     AtomicInteger nextId;
+    final ApplicationAttemptId customAppAttemptId;
 
-    public LocalContainerFactory(AppContext appContext) {
-      this.appContext = appContext;
+    public LocalContainerFactory(AppContext appContext, long appIdLong) {
       this.nextId = new AtomicInteger(1);
+      ApplicationId appId = ApplicationId
+          .newInstance(appIdLong, appContext.getApplicationAttemptId().getApplicationId().getId());
+      this.customAppAttemptId = ApplicationAttemptId
+          .newInstance(appId, appContext.getApplicationAttemptId().getAttemptId());
     }
 
     public Container createContainer(Resource capability, Priority priority) {
-      ApplicationAttemptId appAttemptId = appContext.getApplicationAttemptId();
-      ContainerId containerId = ContainerId.newInstance(appAttemptId, nextId.getAndIncrement());
+      ContainerId containerId = ContainerId.newInstance(customAppAttemptId, nextId.getAndIncrement());
       NodeId nodeId = NodeId.newInstance("127.0.0.1", 0);
       String nodeHttpAddress = "127.0.0.1:0";
 

http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java
index 8bdeb28..f86894f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java
@@ -28,13 +28,15 @@ public class NMCommunicatorEvent extends AbstractEvent<NMCommunicatorEventType>
   private final ContainerId containerId;
   private final NodeId nodeId;
   private final Token containerToken;
+  private final int launcherId;
 
   public NMCommunicatorEvent(ContainerId containerId, NodeId nodeId,
-      Token containerToken, NMCommunicatorEventType type) {
+      Token containerToken, NMCommunicatorEventType type, int launcherId) {
     super(type);
     this.containerId = containerId;
     this.nodeId = nodeId;
     this.containerToken = containerToken;
+    this.launcherId = launcherId;
   }
 
   public ContainerId getContainerId() {
@@ -48,10 +50,14 @@ public class NMCommunicatorEvent extends AbstractEvent<NMCommunicatorEventType>
   public Token getContainerToken() {
     return this.containerToken;
   }
-  
+
+  public int getLauncherId() {
+    return launcherId;
+  }
+
   public String toSrting() {
     return super.toString() + " for container " + containerId + ", nodeId: "
-        + nodeId;
+        + nodeId + ", launcherId: " + launcherId;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java
index c3b12c0..a38345c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java
@@ -25,13 +25,16 @@ public class NMCommunicatorLaunchRequestEvent extends NMCommunicatorEvent {
 
   private final ContainerLaunchContext clc;
   private final Container container;
+  // The task communicator index for the specific container being launched.
+  private final int taskCommId;
 
   public NMCommunicatorLaunchRequestEvent(ContainerLaunchContext clc,
-      Container container) {
+      Container container, int launcherId, int taskCommId) {
     super(container.getId(), container.getNodeId(), container
-        .getContainerToken(), NMCommunicatorEventType.CONTAINER_LAUNCH_REQUEST);
+        .getContainerToken(), NMCommunicatorEventType.CONTAINER_LAUNCH_REQUEST, launcherId);
     this.clc = clc;
     this.container = container;
+    this.taskCommId = taskCommId;
   }
 
   public ContainerLaunchContext getContainerLaunchContext() {
@@ -42,6 +45,10 @@ public class NMCommunicatorLaunchRequestEvent extends NMCommunicatorEvent {
     return container;
   }
 
+  public int getTaskCommId() {
+    return taskCommId;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {

http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java
index 277d1e7..c9b5c44 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java
@@ -25,9 +25,9 @@ import org.apache.hadoop.yarn.api.records.Token;
 public class NMCommunicatorStopRequestEvent extends NMCommunicatorEvent {
 
   public NMCommunicatorStopRequestEvent(ContainerId containerId, NodeId nodeId,
-      Token containerToken) {
+      Token containerToken, int launcherId) {
     super(containerId, nodeId, containerToken,
-        NMCommunicatorEventType.CONTAINER_STOP_REQUEST);
+        NMCommunicatorEventType.CONTAINER_STOP_REQUEST, launcherId);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 72389e7..5a0ace8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -108,9 +108,22 @@ public class TaskSchedulerEventHandler extends AbstractService
   private final String[] taskSchedulerClasses;
   protected final TaskSchedulerService []taskSchedulers;
 
+  private final boolean isPureLocalMode;
+  // If running in non local-only mode, the YARN task scheduler will always run to take care of
+  // registration with YARN and heartbeats to YARN.
+  // Splitting registration and heartbeats is not straigh-forward due to the taskScheduler being
+  // tied to a ContainerRequestType.
+  private final int yarnTaskSchedulerIndex;
+  // Custom AppIds to avoid container conflicts if there's multiple sources
+  private final long SCHEDULER_APP_ID_BASE = 111101111;
+  private final long SCHEDULER_APP_ID_INCREMENT = 111111111;
+
   BlockingQueue<AMSchedulerEvent> eventQueue
                               = new LinkedBlockingQueue<AMSchedulerEvent>();
 
+  // Not tracking container / task to schedulerId. Instead relying on everything flowing through
+  // the system and being propagated back via events.
+
   /**
    *
    * @param appContext
@@ -125,7 +138,7 @@ public class TaskSchedulerEventHandler extends AbstractService
   public TaskSchedulerEventHandler(AppContext appContext,
       DAGClientServer clientService, EventHandler eventHandler, 
       ContainerSignatureMatcher containerSignatureMatcher, WebUIService webUI,
-      String [] schedulerClasses) {
+      String [] schedulerClasses, boolean isPureLocalMode) {
     super(TaskSchedulerEventHandler.class.getName());
     this.appContext = appContext;
     this.eventHandler = eventHandler;
@@ -133,13 +146,39 @@ public class TaskSchedulerEventHandler extends AbstractService
     this.containerSignatureMatcher = containerSignatureMatcher;
     this.webUI = webUI;
     this.historyUrl = getHistoryUrl();
+    this.isPureLocalMode = isPureLocalMode;
     if (this.webUI != null) {
       this.webUI.setHistoryUrl(this.historyUrl);
     }
-    if (schedulerClasses == null || schedulerClasses.length == 0) {
-      this.taskSchedulerClasses = new String[] {TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT};
+
+    // Override everything for pure local mode
+    if (isPureLocalMode) {
+      this.taskSchedulerClasses = new String[] {TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT};
+      this.yarnTaskSchedulerIndex = -1;
     } else {
-      this.taskSchedulerClasses = schedulerClasses;
+      if (schedulerClasses == null || schedulerClasses.length ==0) {
+        this.taskSchedulerClasses = new String[] {TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT};
+        this.yarnTaskSchedulerIndex = 0;
+      } else {
+        // Ensure the YarnScheduler will be setup and note it's index. This will be responsible for heartbeats and YARN registration.
+        int foundYarnTaskSchedulerIndex = -1;
+        for (int i = 0 ; i < schedulerClasses.length ; i++) {
+          if (schedulerClasses[i].equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) {
+            foundYarnTaskSchedulerIndex = i;
+            break;
+          }
+        }
+        if (foundYarnTaskSchedulerIndex == -1) { // Not found. Add at the end.
+          this.taskSchedulerClasses = new String[schedulerClasses.length+1];
+          foundYarnTaskSchedulerIndex = this.taskSchedulerClasses.length -1;
+          for (int i = 0 ; i < schedulerClasses.length ; i++) { // Copy over the rest.
+            this.taskSchedulerClasses[i] = schedulerClasses[i];
+          }
+        } else {
+          this.taskSchedulerClasses = schedulerClasses;
+        }
+        this.yarnTaskSchedulerIndex = foundYarnTaskSchedulerIndex;
+      }
     }
     taskSchedulers = new TaskSchedulerService[this.taskSchedulerClasses.length];
   }
@@ -157,12 +196,12 @@ public class TaskSchedulerEventHandler extends AbstractService
     return cachedNodeCount;
   }
   
-  public Resource getAvailableResources() {
-    return taskSchedulers[0].getAvailableResources();
+  public Resource getAvailableResources(int schedulerId) {
+    return taskSchedulers[schedulerId].getAvailableResources();
   }
 
-  public Resource getTotalResources() {
-    return taskSchedulers[0].getTotalResources();
+  public Resource getTotalResources(int schedulerId) {
+    return taskSchedulers[schedulerId].getTotalResources();
   }
 
   public synchronized void handleEvent(AMSchedulerEvent sEvent) {
@@ -176,7 +215,7 @@ public class TaskSchedulerEventHandler extends AbstractService
       switch(event.getState()) {
       case FAILED:
       case KILLED:
-        handleTAUnsuccessfulEnd((AMSchedulerEventTAEnded) sEvent);
+        handleTAUnsuccessfulEnd(event);
         break;
       case SUCCEEDED:
         handleTASucceeded(event);
@@ -228,9 +267,9 @@ public class TaskSchedulerEventHandler extends AbstractService
 
   private void handleNodeBlacklistUpdate(AMSchedulerEventNodeBlacklistUpdate event) {
     if (event.getType() == AMSchedulerEventType.S_NODE_BLACKLISTED) {
-      taskSchedulers[0].blacklistNode(event.getNodeId());
+      taskSchedulers[event.getSchedulerId()].blacklistNode(event.getNodeId());
     } else if (event.getType() == AMSchedulerEventType.S_NODE_UNBLACKLISTED) {
-      taskSchedulers[0].unblacklistNode(event.getNodeId());
+      taskSchedulers[event.getSchedulerId()].unblacklistNode(event.getNodeId());
     } else {
       throw new TezUncheckedException("Invalid event type: " + event.getType());
     }
@@ -242,14 +281,14 @@ public class TaskSchedulerEventHandler extends AbstractService
     // TODO what happens to the task that was connected to this container?
     // current assumption is that it will eventually call handleTaStopRequest
     //TaskAttempt taskAttempt = (TaskAttempt)
-    taskSchedulers[0].deallocateContainer(containerId);
+    taskSchedulers[event.getSchedulerId()].deallocateContainer(containerId);
     // TODO does this container need to be stopped via C_STOP_REQUEST
     sendEvent(new AMContainerEventStopRequest(containerId));
   }
 
   private void handleTAUnsuccessfulEnd(AMSchedulerEventTAEnded event) {
     TaskAttempt attempt = event.getAttempt();
-    boolean wasContainerAllocated = taskSchedulers[0].deallocateTask(attempt, false);
+    boolean wasContainerAllocated = taskSchedulers[event.getSchedulerId()].deallocateTask(attempt, false);
     // use stored value of container id in case the scheduler has removed this
     // assignment because the task has been deallocated earlier.
     // retroactive case
@@ -291,7 +330,8 @@ public class TaskSchedulerEventHandler extends AbstractService
           event.getAttemptID()));
     }
 
-    boolean wasContainerAllocated = taskSchedulers[0].deallocateTask(attempt, true);
+    boolean wasContainerAllocated = taskSchedulers[event.getSchedulerId()].deallocateTask(attempt,
+        true);
     if (!wasContainerAllocated) {
       LOG.error("De-allocated successful task: " + attempt.getID()
           + ", but TaskScheduler reported no container assigned to task");
@@ -316,7 +356,7 @@ public class TaskSchedulerEventHandler extends AbstractService
         TaskAttempt affinityAttempt = vertex.getTask(taskIndex).getSuccessfulAttempt();
         if (affinityAttempt != null) {
           Preconditions.checkNotNull(affinityAttempt.getAssignedContainerID(), affinityAttempt.getID());
-          taskSchedulers[0].allocateTask(taskAttempt,
+          taskSchedulers[event.getSchedulerId()].allocateTask(taskAttempt,
               event.getCapability(),
               affinityAttempt.getAssignedContainerID(),
               Priority.newInstance(event.getPriority()),
@@ -336,7 +376,7 @@ public class TaskSchedulerEventHandler extends AbstractService
       }
     }
 
-    taskSchedulers[0].allocateTask(taskAttempt,
+    taskSchedulers[event.getSchedulerId()].allocateTask(taskAttempt,
         event.getCapability(),
         hosts,
         racks,
@@ -347,7 +387,8 @@ public class TaskSchedulerEventHandler extends AbstractService
 
   private TaskSchedulerService createTaskScheduler(String host, int port, String trackingUrl,
                                                    AppContext appContext,
-                                                   String schedulerClassName) {
+                                                   String schedulerClassName,
+                                                   long customAppIdIdentifier) {
     if (schedulerClassName.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) {
       LOG.info("Creating TaskScheduler: YarnTaskSchedulerService");
       return new YarnTaskSchedulerService(this, this.containerSignatureMatcher,
@@ -355,7 +396,7 @@ public class TaskSchedulerEventHandler extends AbstractService
     } else if (schedulerClassName.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) {
       LOG.info("Creating TaskScheduler: Local TaskScheduler");
       return new LocalTaskSchedulerService(this, this.containerSignatureMatcher,
-          host, port, trackingUrl, appContext);
+          host, port, trackingUrl, customAppIdIdentifier, appContext);
     } else {
       LOG.info("Creating custom TaskScheduler: " + schedulerClassName);
       // TODO TEZ-2003 Temporary reflection with specific parameters. Remove once there is a clean interface.
@@ -364,9 +405,10 @@ public class TaskSchedulerEventHandler extends AbstractService
       try {
         Constructor<? extends TaskSchedulerService> ctor = taskSchedulerClazz
             .getConstructor(TaskSchedulerAppCallback.class, AppContext.class, String.class,
-                int.class, String.class, Configuration.class);
+                int.class, String.class, long.class, Configuration.class);
         ctor.setAccessible(true);
-        return ctor.newInstance(this, appContext, host, port, trackingUrl, getConfig());
+        return ctor.newInstance(this, appContext, host, port, trackingUrl, customAppIdIdentifier,
+            getConfig());
       } catch (NoSuchMethodException e) {
         throw new TezUncheckedException(e);
       } catch (InvocationTargetException e) {
@@ -381,10 +423,19 @@ public class TaskSchedulerEventHandler extends AbstractService
 
   @VisibleForTesting
   protected void instantiateScheduelrs(String host, int port, String trackingUrl, AppContext appContext) {
+    // TODO Add error checking for components being used in the Vertex when running in pure local mode.
     // Iterate over the list and create all the taskSchedulers
+    int j = 0;
     for (int i = 0; i < taskSchedulerClasses.length; i++) {
+      long customAppIdIdentifier;
+      if (isPureLocalMode || taskSchedulerClasses[i].equals(
+          TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) { // Use the app identifier from the appId.
+        customAppIdIdentifier = appContext.getApplicationID().getClusterTimestamp();
+      } else {
+        customAppIdIdentifier = SCHEDULER_APP_ID_BASE + (j++ * SCHEDULER_APP_ID_INCREMENT);
+      }
       taskSchedulers[i] = createTaskScheduler(host, port,
-          trackingUrl, appContext, taskSchedulerClasses[i]);
+          trackingUrl, appContext, taskSchedulerClasses[i], customAppIdIdentifier);
     }
   }
 
@@ -403,12 +454,12 @@ public class TaskSchedulerEventHandler extends AbstractService
     for (int i = 0 ; i < taskSchedulers.length ; i++) {
       taskSchedulers[i].init(getConfig());
       taskSchedulers[i].start();
-    }
-
-    // TODO TEZ-2118 Start using multiple task schedulers
-    if (shouldUnregisterFlag.get()) {
-      // Flag may have been set earlier when task scheduler was not initialized
-      taskSchedulers[0].setShouldUnregister();
+      if (shouldUnregisterFlag.get()) {
+        // Flag may have been set earlier when task scheduler was not initialized
+        // TODO TEZ-2003 Should setRegister / unregister be part of APIs when not YARN specific ?
+        // External services could need to talk to some other entity.
+        taskSchedulers[i].setShouldUnregister();
+      }
     }
 
     this.eventHandlingThread = new Thread("TaskSchedulerEventHandlerThread") {
@@ -457,8 +508,10 @@ public class TaskSchedulerEventHandler extends AbstractService
       if (eventHandlingThread != null)
         eventHandlingThread.interrupt();
     }
-    if (taskSchedulers[0] != null) {
-      ((AbstractService)taskSchedulers[0]).stop();
+    for (int i = 0 ; i < taskSchedulers.length ; i++) {
+      if (taskSchedulers[i] != null) {
+        taskSchedulers[i].stop();
+      }
     }
   }
 
@@ -467,15 +520,18 @@ public class TaskSchedulerEventHandler extends AbstractService
   public synchronized void taskAllocated(Object task,
                                            Object appCookie,
                                            Container container) {
+    AMSchedulerEventTALaunchRequest event =
+        (AMSchedulerEventTALaunchRequest) appCookie;
     ContainerId containerId = container.getId();
-    if (appContext.getAllContainers().addContainerIfNew(container)) {
+    if (appContext.getAllContainers()
+        .addContainerIfNew(container, event.getSchedulerId(), event.getLauncherId(),
+            event.getTaskCommId())) {
       appContext.getNodeTracker().nodeSeen(container.getNodeId());
       sendEvent(new AMNodeEventContainerAllocated(container
           .getNodeId(), container.getId()));
     }
 
-    AMSchedulerEventTALaunchRequest event =
-                         (AMSchedulerEventTALaunchRequest) appCookie;
+
     TaskAttempt taskAttempt = event.getTaskAttempt();
     // TODO - perhaps check if the task still needs this container
     // because the deallocateTask downcall may have raced with the
@@ -484,7 +540,7 @@ public class TaskSchedulerEventHandler extends AbstractService
  
     if (appContext.getAllContainers().get(containerId).getState() == AMContainerState.ALLOCATED) {
       sendEvent(new AMContainerEventLaunchRequest(containerId, taskAttempt.getVertexID(),
-          event.getContainerContext()));
+          event.getContainerContext(), event.getLauncherId(), event.getTaskCommId()));
     }
     sendEvent(new DAGEventSchedulerUpdateTAAssigned(taskAttempt, container));
     sendEvent(new AMContainerEventAssignTA(containerId, taskAttempt.getID(),
@@ -603,6 +659,9 @@ public class TaskSchedulerEventHandler extends AbstractService
   public float getProgress() {
     // at this point allocate has been called and so node count must be available
     // may change after YARN-1722
+    // This is a heartbeat in from the scheduler into the APP, and is being used to piggy-back and
+    // node updates from the cluster.
+    // TODO Handle this in TEZ-2124. Need a way to know which scheduler is calling in.
     int nodeCount = taskSchedulers[0].getClusterNodeCount();
     if (nodeCount != cachedNodeCount) {
       cachedNodeCount = nodeCount;
@@ -618,7 +677,9 @@ public class TaskSchedulerEventHandler extends AbstractService
   }
 
   public void dagCompleted() {
-    taskSchedulers[0].dagComplete();
+    for (int i = 0 ; i < taskSchedulers.length ; i++) {
+      taskSchedulers[i].dagComplete();
+    }
   }
 
   public void dagSubmitted() {
@@ -628,7 +689,10 @@ public class TaskSchedulerEventHandler extends AbstractService
 
   @Override
   public void preemptContainer(ContainerId containerId) {
-    taskSchedulers[0].deallocateContainer(containerId);
+    // TODO Why is this making a call back into the scheduler, when the call is originating from there.
+    // An AMContainer instance should already exist if an attempt is being made to preempt it
+    AMContainer amContainer = appContext.getAllContainers().get(containerId);
+    taskSchedulers[amContainer.getTaskSchedulerIdentifier()].deallocateContainer(containerId);
     // Inform the Containers about completion.
     sendEvent(new AMContainerEventCompleted(containerId, ContainerExitStatus.INVALID,
         "Container preempted internally", TaskAttemptTerminationCause.INTERNAL_PREEMPTION));
@@ -637,13 +701,24 @@ public class TaskSchedulerEventHandler extends AbstractService
   public void setShouldUnregisterFlag() {
     LOG.info("TaskScheduler notified that it should unregister from RM");
     this.shouldUnregisterFlag.set(true);
-    if (this.taskSchedulers[0] != null) {
-      this.taskSchedulers[0].setShouldUnregister();
+    for (int i = 0 ; i < taskSchedulers.length ; i++) {
+      if (this.taskSchedulers[i] != null) {
+        // TODO TEZ-2003 registration required for all schedulers ?
+        this.taskSchedulers[i].setShouldUnregister();
+      }
     }
   }
 
   public boolean hasUnregistered() {
-    return this.taskSchedulers[0].hasUnregistered();
+    boolean result = true;
+    for (int i = 0 ; i < taskSchedulers.length ; i++) {
+      // TODO TEZ-2003 registration required for all schedulers ?
+      result |= this.taskSchedulers[i].hasUnregistered();
+      if (result == false) {
+        return result;
+      }
+    }
+    return result;
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
index 0fc2e12..6616896 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
@@ -34,4 +34,7 @@ public interface AMContainer extends EventHandler<AMContainerEvent>{
   public List<TezTaskAttemptID> getAllTaskAttempts();
   public TezTaskAttemptID getCurrentTaskAttempt();
 
+  public int getTaskSchedulerIdentifier();
+  public int getContainerLauncherIdentifier();
+  public int getTaskCommunicatorIdentifier();
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java
index d973264..92e5817 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java
@@ -27,12 +27,17 @@ public class AMContainerEventLaunchRequest extends AMContainerEvent {
 
   private final TezVertexID vertexId;
   private final ContainerContext containerContext;
+  private final int launcherId;
+  private final int taskCommId;
 
   public AMContainerEventLaunchRequest(ContainerId containerId,
-      TezVertexID vertexId, ContainerContext containerContext) {
+      TezVertexID vertexId, ContainerContext containerContext,
+      int launcherId, int taskCommId) {
     super(containerId, AMContainerEventType.C_LAUNCH_REQUEST);
     this.vertexId = vertexId;
     this.containerContext = containerContext;
+    this.launcherId = launcherId;
+    this.taskCommId = taskCommId;
   }
 
   public TezDAGID getDAGId() {
@@ -46,4 +51,12 @@ public class AMContainerEventLaunchRequest extends AMContainerEvent {
   public ContainerContext getContainerContext() {
     return this.containerContext;
   }
+
+  public int getLauncherId() {
+    return launcherId;
+  }
+
+  public int getTaskCommId() {
+    return taskCommId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 1acec9c..39df2e8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -81,6 +81,9 @@ public class AMContainerImpl implements AMContainer {
   private final TaskAttemptListener taskAttemptListener;
   protected final EventHandler eventHandler;
   private final ContainerSignatureMatcher signatureMatcher;
+  private final int schedulerId;
+  private final int launcherId;
+  private final int taskCommId;
 
   private final List<TezTaskAttemptID> completedAttempts =
       new LinkedList<TezTaskAttemptID>();
@@ -302,7 +305,7 @@ public class AMContainerImpl implements AMContainer {
   // additional change - JvmID, YarnChild, etc depend on TaskType.
   public AMContainerImpl(Container container, ContainerHeartbeatHandler chh,
       TaskAttemptListener tal, ContainerSignatureMatcher signatureMatcher,
-      AppContext appContext) {
+      AppContext appContext, int schedulerId, int launcherId, int taskCommId) {
     ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
     this.readLock = rwLock.readLock();
     this.writeLock = rwLock.writeLock();
@@ -314,6 +317,9 @@ public class AMContainerImpl implements AMContainer {
     this.containerHeartbeatHandler = chh;
     this.taskAttemptListener = tal;
     this.failedAssignments = new LinkedList<TezTaskAttemptID>();
+    this.schedulerId = schedulerId;
+    this.launcherId = launcherId;
+    this.taskCommId = taskCommId;
     this.stateMachine = stateMachineFactory.make(this);
   }
 
@@ -363,6 +369,21 @@ public class AMContainerImpl implements AMContainer {
     }
   }
 
+  @Override
+  public int getTaskSchedulerIdentifier() {
+    return this.schedulerId;
+  }
+
+  @Override
+  public int getContainerLauncherIdentifier() {
+    return this.launcherId;
+  }
+
+  @Override
+  public int getTaskCommunicatorIdentifier() {
+    return this.taskCommId;
+  }
+
   public boolean isInErrorState() {
     return inError;
   }
@@ -432,7 +453,7 @@ public class AMContainerImpl implements AMContainer {
           containerContext.getLocalResources(),
           containerContext.getEnvironment(),
           containerContext.getJavaOpts(),
-          container.taskAttemptListener.getAddress(), containerContext.getCredentials(),
+          container.taskAttemptListener.getTaskCommunicator(container.taskCommId).getAddress(), containerContext.getCredentials(),
           container.appContext, container.container.getResource(),
           container.appContext.getAMConf());
 
@@ -1014,7 +1035,7 @@ public class AMContainerImpl implements AMContainer {
   }
   
   protected void deAllocate() {
-    sendEvent(new AMSchedulerEventDeallocateContainer(containerId));
+    sendEvent(new AMSchedulerEventDeallocateContainer(containerId, schedulerId));
   }
 
   protected void sendTerminatedToTaskAttempt(
@@ -1044,28 +1065,28 @@ public class AMContainerImpl implements AMContainer {
   }
 
   protected void sendStartRequestToNM(ContainerLaunchContext clc) {
-    sendEvent(new NMCommunicatorLaunchRequestEvent(clc, container));
+    sendEvent(new NMCommunicatorLaunchRequestEvent(clc, container, launcherId, taskCommId));
   }
 
   protected void sendStopRequestToNM() {
     sendEvent(new NMCommunicatorStopRequestEvent(containerId,
-        container.getNodeId(), container.getContainerToken()));
+        container.getNodeId(), container.getContainerToken(), launcherId));
   }
 
   protected void unregisterAttemptFromListener(TezTaskAttemptID attemptId) {
-    taskAttemptListener.unregisterTaskAttempt(attemptId);
+    taskAttemptListener.unregisterTaskAttempt(attemptId, taskCommId);
   }
 
   protected void registerAttemptWithListener(AMContainerTask amContainerTask) {
-    taskAttemptListener.registerTaskAttempt(amContainerTask, this.containerId);
+    taskAttemptListener.registerTaskAttempt(amContainerTask, this.containerId, taskCommId);
   }
 
   protected void registerWithTAListener() {
-    taskAttemptListener.registerRunningContainer(containerId);
+    taskAttemptListener.registerRunningContainer(containerId, taskCommId);
   }
 
   protected void unregisterFromTAListener() {
-    this.taskAttemptListener.unregisterRunningContainer(containerId);
+    this.taskAttemptListener.unregisterRunningContainer(containerId, taskCommId);
   }
 
   protected void registerWithContainerListener() {

http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
index 574c38e..938096d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
@@ -62,9 +62,9 @@ public class AMContainerMap extends AbstractService implements EventHandler<AMCo
     }
   }
 
-  public boolean addContainerIfNew(Container container) {
+  public boolean addContainerIfNew(Container container, int schedulerId, int launcherId, int taskCommId) {
     AMContainer amc = new AMContainerImpl(container, chh, tal,
-      containerSignatureMatcher, context);
+      containerSignatureMatcher, context, schedulerId, launcherId, taskCommId);
     return (containerMap.putIfAbsent(container.getId(), amc) == null);
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
index b93cab3..0d8e4cd 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
@@ -257,7 +257,8 @@ public class AMNodeImpl implements AMNode {
     // these containers are not useful anymore
     pastContainers.addAll(containers);
     containers.clear();
-    sendEvent(new AMSchedulerEventNodeBlacklistUpdate(getNodeId(), true));
+    // TODO TEZ-2124 node tracking per ext source
+    sendEvent(new AMSchedulerEventNodeBlacklistUpdate(getNodeId(), true, 0));
   }
 
   @SuppressWarnings("unchecked")
@@ -363,7 +364,8 @@ public class AMNodeImpl implements AMNode {
     public void transition(AMNodeImpl node, AMNodeEvent nEvent) {
       node.ignoreBlacklisting = ignore;
       if (node.getState() == AMNodeState.BLACKLISTED) {
-        node.sendEvent(new AMSchedulerEventNodeBlacklistUpdate(node.getNodeId(), false));
+        // TODO TEZ-2124 node tracking per ext source
+        node.sendEvent(new AMSchedulerEventNodeBlacklistUpdate(node.getNodeId(), false, 0));
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index 26fc1ab..a466bc6 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -192,7 +192,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
     @Override
     public void serviceStart() throws Exception {
       taListener = (TaskAttemptListenerImpTezDag) getTaskAttemptListener();
-      taskCommunicator = (TezTaskCommunicatorImpl) taListener.getTaskCommunicator();
+      taskCommunicator = (TezTaskCommunicatorImpl) taListener.getTaskCommunicator(0);
       eventHandlingThread = new Thread(this);
       eventHandlingThread.start();
       ExecutorService rawExecutor = Executors.newFixedThreadPool(handlerConcurrency,

http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
index 16bd1d3..bffb5b9 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
@@ -108,9 +108,16 @@ public class TestTaskAttemptListenerImplTezDag {
     doReturn(dag).when(appContext).getCurrentDAG();
     doReturn(appAcls).when(appContext).getApplicationACLs();
     doReturn(amContainerMap).when(appContext).getAllContainers();
+    NodeId nodeId = NodeId.newInstance("localhost", 0);
+
+    AMContainer amContainer = mock(AMContainer.class);
+    Container container = mock(Container.class);
+    doReturn(nodeId).when(container).getNodeId();
+    doReturn(amContainer).when(amContainerMap).get(any(ContainerId.class));
+    doReturn(container).when(amContainer).getContainer();
 
     taskAttemptListener = new TaskAttemptListenerImplForTest(appContext,
-        mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null);
+        mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null, null);
 
     taskSpec = mock(TaskSpec.class);
     doReturn(taskAttemptID).when(taskSpec).getTaskAttemptID();
@@ -121,7 +128,7 @@ public class TestTaskAttemptListenerImplTezDag {
   @Test(timeout = 5000)
   public void testGetTask() throws IOException {
 
-    TezTaskCommunicatorImpl taskCommunicator = (TezTaskCommunicatorImpl)taskAttemptListener.getTaskCommunicator();
+    TezTaskCommunicatorImpl taskCommunicator = (TezTaskCommunicatorImpl)taskAttemptListener.getTaskCommunicator(0);
     TezTaskUmbilicalProtocol tezUmbilical = taskCommunicator.getUmbilical();
 
     ContainerId containerId1 = createContainerId(appId, 1);
@@ -131,55 +138,55 @@ public class TestTaskAttemptListenerImplTezDag {
 
     ContainerId containerId2 = createContainerId(appId, 2);
     ContainerContext containerContext2 = new ContainerContext(containerId2.toString());
-    taskAttemptListener.registerRunningContainer(containerId2);
+    taskAttemptListener.registerRunningContainer(containerId2, 0);
     containerTask = tezUmbilical.getTask(containerContext2);
     assertNull(containerTask);
 
     // Valid task registered
-    taskAttemptListener.registerTaskAttempt(amContainerTask, containerId2);
+    taskAttemptListener.registerTaskAttempt(amContainerTask, containerId2, 0);
     containerTask = tezUmbilical.getTask(containerContext2);
     assertFalse(containerTask.shouldDie());
     assertEquals(taskSpec, containerTask.getTaskSpec());
 
     // Task unregistered. Should respond to heartbeats
-    taskAttemptListener.unregisterTaskAttempt(taskAttemptId);
+    taskAttemptListener.unregisterTaskAttempt(taskAttemptId, 0);
     containerTask = tezUmbilical.getTask(containerContext2);
     assertNull(containerTask);
 
     // Container unregistered. Should send a shouldDie = true
-    taskAttemptListener.unregisterRunningContainer(containerId2);
+    taskAttemptListener.unregisterRunningContainer(containerId2, 0);
     containerTask = tezUmbilical.getTask(containerContext2);
     assertTrue(containerTask.shouldDie());
 
     ContainerId containerId3 = createContainerId(appId, 3);
     ContainerContext containerContext3 = new ContainerContext(containerId3.toString());
-    taskAttemptListener.registerRunningContainer(containerId3);
+    taskAttemptListener.registerRunningContainer(containerId3, 0);
 
     // Register task to container3, followed by unregistering container 3 all together
     TaskSpec taskSpec2 = mock(TaskSpec.class);
     TezTaskAttemptID taskAttemptId2 = mock(TezTaskAttemptID.class);
     doReturn(taskAttemptId2).when(taskSpec2).getTaskAttemptID();
     AMContainerTask amContainerTask2 = new AMContainerTask(taskSpec, null, null, false, 0);
-    taskAttemptListener.registerTaskAttempt(amContainerTask2, containerId3);
-    taskAttemptListener.unregisterRunningContainer(containerId3);
+    taskAttemptListener.registerTaskAttempt(amContainerTask2, containerId3, 0);
+    taskAttemptListener.unregisterRunningContainer(containerId3, 0);
     containerTask = tezUmbilical.getTask(containerContext3);
     assertTrue(containerTask.shouldDie());
   }
 
   @Test(timeout = 5000)
   public void testGetTaskMultiplePulls() throws IOException {
-    TezTaskCommunicatorImpl taskCommunicator = (TezTaskCommunicatorImpl)taskAttemptListener.getTaskCommunicator();
+    TezTaskCommunicatorImpl taskCommunicator = (TezTaskCommunicatorImpl)taskAttemptListener.getTaskCommunicator(0);
     TezTaskUmbilicalProtocol tezUmbilical = taskCommunicator.getUmbilical();
 
     ContainerId containerId1 = createContainerId(appId, 1);
     doReturn(mock(AMContainer.class)).when(amContainerMap).get(containerId1);
     ContainerContext containerContext1 = new ContainerContext(containerId1.toString());
-    taskAttemptListener.registerRunningContainer(containerId1);
+    taskAttemptListener.registerRunningContainer(containerId1, 0);
     containerTask = tezUmbilical.getTask(containerContext1);
     assertNull(containerTask);
 
     // Register task
-    taskAttemptListener.registerTaskAttempt(amContainerTask, containerId1);
+    taskAttemptListener.registerTaskAttempt(amContainerTask, containerId1, 0);
     containerTask = tezUmbilical.getTask(containerContext1);
     assertFalse(containerTask.shouldDie());
     assertEquals(taskSpec, containerTask.getTaskSpec());

http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 60c4c88..9df225c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.common.MockDNSToSwitchMapping;
+import org.apache.tez.dag.api.TaskCommunicator;
 import org.apache.tez.dag.api.TaskLocationHint;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
@@ -273,8 +274,9 @@ public class TestTaskAttempt {
 
     MockEventHandler eventHandler = new MockEventHandler();
     TaskAttemptListener taListener = mock(TaskAttemptListener.class);
-    when(taListener.getAddress()).thenReturn(
-        new InetSocketAddress("localhost", 0));
+    TaskCommunicator taskComm = mock(TaskCommunicator.class);
+    doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
+    doReturn(taskComm).when(taListener).getTaskCommunicator(0);
 
     Configuration taskConf = new Configuration();
     taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -323,8 +325,9 @@ public class TestTaskAttempt {
 
     MockEventHandler eventHandler = spy(new MockEventHandler());
     TaskAttemptListener taListener = mock(TaskAttemptListener.class);
-    when(taListener.getAddress()).thenReturn(
-        new InetSocketAddress("localhost", 0));
+    TaskCommunicator taskComm = mock(TaskCommunicator.class);
+    doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
+    doReturn(taskComm).when(taListener).getTaskCommunicator(0);
 
     Configuration taskConf = new Configuration();
     taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -345,7 +348,7 @@ public class TestTaskAttempt {
     AMContainerMap containers = new AMContainerMap(
         mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
         new ContainerContextMatcher(), appCtx);
-    containers.addContainerIfNew(container);
+    containers.addContainerIfNew(container, 0, 0, 0);
 
     doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
     doReturn(containers).when(appCtx).getAllContainers();
@@ -424,8 +427,9 @@ public class TestTaskAttempt {
 
     MockEventHandler eventHandler = new MockEventHandler();
     TaskAttemptListener taListener = mock(TaskAttemptListener.class);
-    when(taListener.getAddress()).thenReturn(
-        new InetSocketAddress("localhost", 0));
+    TaskCommunicator taskComm = mock(TaskCommunicator.class);
+    doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
+    doReturn(taskComm).when(taListener).getTaskCommunicator(0);
 
     Configuration taskConf = new Configuration();
     taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -446,7 +450,7 @@ public class TestTaskAttempt {
     AMContainerMap containers = new AMContainerMap(
         mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
         new ContainerContextMatcher(), appCtx);
-    containers.addContainerIfNew(container);
+    containers.addContainerIfNew(container, 0, 0, 0);
 
     doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
     doReturn(containers).when(appCtx).getAllContainers();
@@ -489,8 +493,9 @@ public class TestTaskAttempt {
 
     MockEventHandler eventHandler = spy(new MockEventHandler());
     TaskAttemptListener taListener = mock(TaskAttemptListener.class);
-    when(taListener.getAddress()).thenReturn(
-        new InetSocketAddress("localhost", 0));
+    TaskCommunicator taskComm = mock(TaskCommunicator.class);
+    doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
+    doReturn(taskComm).when(taListener).getTaskCommunicator(0);
 
     Configuration taskConf = new Configuration();
     taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -511,7 +516,7 @@ public class TestTaskAttempt {
     AMContainerMap containers = new AMContainerMap(
         mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
         new ContainerContextMatcher(), appCtx);
-    containers.addContainerIfNew(container);
+    containers.addContainerIfNew(container, 0, 0, 0);
 
     doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
     doReturn(containers).when(appCtx).getAllContainers();
@@ -581,8 +586,9 @@ public class TestTaskAttempt {
 
     MockEventHandler eventHandler = spy(new MockEventHandler());
     TaskAttemptListener taListener = mock(TaskAttemptListener.class);
-    when(taListener.getAddress()).thenReturn(
-        new InetSocketAddress("localhost", 0));
+    TaskCommunicator taskComm = mock(TaskCommunicator.class);
+    doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
+    doReturn(taskComm).when(taListener).getTaskCommunicator(0);
 
     Configuration taskConf = new Configuration();
     taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -604,7 +610,7 @@ public class TestTaskAttempt {
     AMContainerMap containers = new AMContainerMap(
         mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
         new ContainerContextMatcher(), appCtx);
-    containers.addContainerIfNew(container);
+    containers.addContainerIfNew(container, 0, 0, 0);
 
     doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
     doReturn(containers).when(appCtx).getAllContainers();
@@ -712,8 +718,9 @@ public class TestTaskAttempt {
 
     MockEventHandler eventHandler = spy(new MockEventHandler());
     TaskAttemptListener taListener = mock(TaskAttemptListener.class);
-    when(taListener.getAddress()).thenReturn(
-        new InetSocketAddress("localhost", 0));
+    TaskCommunicator taskComm = mock(TaskCommunicator.class);
+    doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
+    doReturn(taskComm).when(taListener).getTaskCommunicator(0);
 
     Configuration taskConf = new Configuration();
     taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -735,7 +742,7 @@ public class TestTaskAttempt {
     AMContainerMap containers = new AMContainerMap(
         mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
         new ContainerContextMatcher(), appCtx);
-    containers.addContainerIfNew(container);
+    containers.addContainerIfNew(container, 0, 0, 0);
 
     doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
     doReturn(containers).when(appCtx).getAllContainers();
@@ -804,8 +811,9 @@ public class TestTaskAttempt {
 
     MockEventHandler eventHandler = spy(new MockEventHandler());
     TaskAttemptListener taListener = mock(TaskAttemptListener.class);
-    when(taListener.getAddress()).thenReturn(
-        new InetSocketAddress("localhost", 0));
+    TaskCommunicator taskComm = mock(TaskCommunicator.class);
+    doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
+    doReturn(taskComm).when(taListener).getTaskCommunicator(0);
 
     Configuration taskConf = new Configuration();
     taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -826,7 +834,7 @@ public class TestTaskAttempt {
     AMContainerMap containers = new AMContainerMap(
         mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
         new ContainerContextMatcher(), appCtx);
-    containers.addContainerIfNew(container);
+    containers.addContainerIfNew(container, 0, 0, 0);
 
     doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
     doReturn(containers).when(appCtx).getAllContainers();
@@ -899,8 +907,9 @@ public class TestTaskAttempt {
 
     MockEventHandler eventHandler = spy(new MockEventHandler());
     TaskAttemptListener taListener = mock(TaskAttemptListener.class);
-    when(taListener.getAddress()).thenReturn(
-        new InetSocketAddress("localhost", 0));
+    TaskCommunicator taskComm = mock(TaskCommunicator.class);
+    doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
+    doReturn(taskComm).when(taListener).getTaskCommunicator(0);
 
     Configuration taskConf = new Configuration();
     taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -921,7 +930,7 @@ public class TestTaskAttempt {
     AMContainerMap containers = new AMContainerMap(
         mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
         new ContainerContextMatcher(), appCtx);
-    containers.addContainerIfNew(container);
+    containers.addContainerIfNew(container, 0, 0, 0);
 
     doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
     doReturn(containers).when(appCtx).getAllContainers();
@@ -1002,8 +1011,9 @@ public class TestTaskAttempt {
 
     MockEventHandler eventHandler = spy(new MockEventHandler());
     TaskAttemptListener taListener = mock(TaskAttemptListener.class);
-    when(taListener.getAddress()).thenReturn(
-        new InetSocketAddress("localhost", 0));
+    TaskCommunicator taskComm = mock(TaskCommunicator.class);
+    doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
+    doReturn(taskComm).when(taListener).getTaskCommunicator(0);
 
     Configuration taskConf = new Configuration();
     taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -1024,7 +1034,7 @@ public class TestTaskAttempt {
     AMContainerMap containers = new AMContainerMap(
         mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
         new ContainerContextMatcher(), appCtx);
-    containers.addContainerIfNew(container);
+    containers.addContainerIfNew(container, 0, 0, 0);
 
     doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
     doReturn(containers).when(appCtx).getAllContainers();
@@ -1102,8 +1112,9 @@ public class TestTaskAttempt {
     MockEventHandler mockEh = new MockEventHandler();
     MockEventHandler eventHandler = spy(mockEh);
     TaskAttemptListener taListener = mock(TaskAttemptListener.class);
-    when(taListener.getAddress()).thenReturn(
-        new InetSocketAddress("localhost", 0));
+    TaskCommunicator taskComm = mock(TaskCommunicator.class);
+    doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
+    doReturn(taskComm).when(taListener).getTaskCommunicator(0);
 
     Configuration taskConf = new Configuration();
     taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -1124,7 +1135,7 @@ public class TestTaskAttempt {
     AMContainerMap containers = new AMContainerMap(
         mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
         new ContainerContextMatcher(), appCtx);
-    containers.addContainerIfNew(container);
+    containers.addContainerIfNew(container, 0, 0, 0);
 
     doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
     doReturn(containers).when(appCtx).getAllContainers();

http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 99ec6cf..df29eaa 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -2176,7 +2176,7 @@ public class TestVertexImpl {
     doReturn(dagId).when(appContext).getCurrentDAGID();
     doReturn(dagId).when(dag).getID();
     doReturn(taskScheduler).when(appContext).getTaskScheduler();
-    doReturn(Resource.newInstance(102400, 60)).when(taskScheduler).getTotalResources();
+    doReturn(Resource.newInstance(102400, 60)).when(taskScheduler).getTotalResources(0);
     doReturn(historyEventHandler).when(appContext).getHistoryHandler();
     doReturn(dispatcher.getEventHandler()).when(appContext).getEventHandler();
 
@@ -2942,7 +2942,7 @@ public class TestVertexImpl {
     AMContainerMap containers = new AMContainerMap(
         mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
         new ContainerContextMatcher(), appContext);
-    containers.addContainerIfNew(container);
+    containers.addContainerIfNew(container, 0, 0, 0);
     doReturn(containers).when(appContext).getAllContainers();
 
     ta.handle(new TaskAttemptEventStartedRemotely(ta.getID(), contId, null));
@@ -2977,7 +2977,7 @@ public class TestVertexImpl {
     AMContainerMap containers = new AMContainerMap(
         mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
         new ContainerContextMatcher(), appContext);
-    containers.addContainerIfNew(container);
+    containers.addContainerIfNew(container, 0, 0, 0);
     doReturn(containers).when(appContext).getAllContainers();
 
     ta.handle(new TaskAttemptEventStartedRemotely(ta.getID(), contId, null));
@@ -3013,7 +3013,7 @@ public class TestVertexImpl {
     AMContainerMap containers = new AMContainerMap(
         mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
         new ContainerContextMatcher(), appContext);
-    containers.addContainerIfNew(container);
+    containers.addContainerIfNew(container, 0, 0, 0);
     doReturn(containers).when(appContext).getAllContainers();
 
     ta.handle(new TaskAttemptEventStartedRemotely(ta.getID(), contId, null));