You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/09 03:42:58 UTC

[20/43] tez git commit: TEZ-2122. Setup pluggable components at AM/Vertex level. (sseth)

TEZ-2122. Setup pluggable components at AM/Vertex level. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/cee48099
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/cee48099
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/cee48099

Branch: refs/heads/TEZ-2003
Commit: cee4809942dbdeafabc7e9442e17aab54c54fdba
Parents: 5f27b83
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Feb 19 14:59:18 2015 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri May 8 14:41:01 2015 -0700

----------------------------------------------------------------------
 TEZ-2003-CHANGES.txt                            |   1 +
 .../apache/tez/dag/api/TezConfiguration.java    |  29 +++-
 .../org/apache/tez/dag/api/TezConstants.java    |   3 +
 .../java/org/apache/tez/dag/app/AppContext.java |   4 +
 .../org/apache/tez/dag/app/DAGAppMaster.java    | 121 +++++++++++++-
 .../dag/app/TaskAttemptListenerImpTezDag.java   |  77 +++++----
 .../java/org/apache/tez/dag/app/dag/Vertex.java |   4 +
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |   8 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  47 ++++++
 .../app/launcher/ContainerLauncherRouter.java   |  93 +++++++----
 .../app/rm/AMSchedulerEventTALaunchRequest.java |  22 ++-
 .../dag/app/rm/TaskSchedulerEventHandler.java   | 163 +++++++++++--------
 .../apache/tez/dag/app/MockDAGAppMaster.java    |   5 +-
 .../app/TestTaskAttemptListenerImplTezDag.java  |  19 +--
 .../tez/dag/app/rm/TestContainerReuse.java      |   2 +-
 .../app/rm/TestTaskSchedulerEventHandler.java   |  12 +-
 .../dag/app/rm/TestTaskSchedulerHelpers.java    |  18 +-
 .../tez/tests/TestExternalTezServices.java      |  19 ++-
 18 files changed, 458 insertions(+), 189 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/cee48099/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index 1cd74a4..4bfe08f 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -3,5 +3,6 @@ ALL CHANGES:
   TEZ-2006. Task communication plane needs to be pluggable.
   TEZ-2090. Add tests for jobs running in external services.
   TEZ-2117. Add a manager for ContainerLaunchers running in the AM.
+  TEZ-2122. Setup pluggable components at AM/Vertex level.
 
 INCOMPATIBLE CHANGES:

http://git-wip-us.apache.org/repos/asf/tez/blob/cee48099/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 1cd478e..1f5f157 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -1168,13 +1168,36 @@ public class TezConfiguration extends Configuration {
       + "tez-ui.webservice.enable";
   public static final boolean TEZ_AM_WEBSERVICE_ENABLE_DEFAULT = true;
 
+  /** defaults container-launcher for the specific vertex */
   @ConfigurationScope(Scope.VERTEX)
-  public static final String TEZ_AM_CONTAINER_LAUNCHER_CLASS = TEZ_AM_PREFIX + "container-launcher.class";
+  public static final String TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME = TEZ_AM_PREFIX + "vertex.container-launcher.name";
+  /** defaults task-scheduler for the specific vertex */
   @ConfigurationScope(Scope.VERTEX)
-  public static final String TEZ_AM_TASK_SCHEDULER_CLASS = TEZ_AM_PREFIX + "task-scheduler.class";
+  public static final String TEZ_AM_VERTEX_TASK_SCHEDULER_NAME = TEZ_AM_PREFIX + "vertex.task-scheduler.name";
+  /** defaults task-communicator for the specific vertex */
   @ConfigurationScope(Scope.VERTEX)
-  public static final String TEZ_AM_TASK_COMMUNICATOR_CLASS = TEZ_AM_PREFIX + "task-communicator.class";
+  public static final String TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME = TEZ_AM_PREFIX + "vertex.task-communicator.name";
 
+  /** Comma separated list of named container-launcher classes running in the AM.
+   * The format for each entry is NAME:CLASSNAME, except for tez default which is specified as Tez
+   * e.g. Tez, ExtService:org.apache.ExtLauncherClasss
+   * */
+  @ConfigurationScope(Scope.AM)
+  public static final String TEZ_AM_CONTAINER_LAUNCHERS = TEZ_AM_PREFIX + "container-launchers";
+
+  /** Comma separated list of task-schedulers classes running in the AM.
+   * The format for each entry is NAME:CLASSNAME, except for tez default which is specified as Tez
+   * e.g. Tez, ExtService:org.apache.ExtSchedulerClasss
+   */
+  @ConfigurationScope(Scope.AM)
+  public static final String TEZ_AM_TASK_SCHEDULERS = TEZ_AM_PREFIX + "task-schedulers";
+
+  /** Comma separated list of task-communicators classes running in the AM.
+   * The format for each entry is NAME:CLASSNAME, except for tez default which is specified as Tez
+   * e.g. Tez, ExtService:org.apache.ExtTaskCommClass
+   * */
+   @ConfigurationScope(Scope.AM)
+  public static final String TEZ_AM_TASK_COMMUNICATORS = TEZ_AM_PREFIX + "task-communicators";
 
   // TODO only validate property here, value can also be validated if necessary
   public static void validateProperty(String property, Scope usedScope) {

http://git-wip-us.apache.org/repos/asf/tez/blob/cee48099/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
index bc4208f..3b07c59 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
@@ -102,4 +102,7 @@ public class TezConstants {
   /// Version-related Environment variables
   public static final String TEZ_CLIENT_VERSION_ENV = "TEZ_CLIENT_VERSION";
 
+
+  public static final String TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT = "Tez";
+  public static final String TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT = "TezLocal";
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/cee48099/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
index 37f7624..9463226 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
@@ -109,4 +109,8 @@ public interface AppContext {
   String getAMUser();
 
   Credentials getAppCredentials();
+
+  public Integer getTaskCommunicatorIdentifier(String name);
+  public Integer getTaskScheduerIdentifier(String name);
+  public Integer getContainerLauncherIdentifier(String name);
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/cee48099/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 0f4d812..6814cda 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -54,6 +54,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.Options;
@@ -266,7 +268,12 @@ public class DAGAppMaster extends AbstractService {
   
   private ExecutorService rawExecutor;
   private ListeningExecutorService execService;
-  
+
+  // TODO May not need to be a bidi map
+  private final BiMap<String, Integer> taskSchedulers = HashBiMap.create();
+  private final BiMap<String, Integer> containerLaunchers = HashBiMap.create();
+  private final BiMap<String, Integer> taskCommunicators = HashBiMap.create();
+
   /**
    * set of already executed dag names.
    */
@@ -370,6 +377,29 @@ public class DAGAppMaster extends AbstractService {
     this.isLocal = conf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE,
         TezConfiguration.TEZ_LOCAL_MODE_DEFAULT);
 
+    String tezDefaultClassIdentifier =
+        isLocal ? TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT :
+            TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT;
+
+    String[] taskSchedulerClassIdentifiers = parsePlugins(taskSchedulers,
+        conf.getTrimmedStrings(TezConfiguration.TEZ_AM_TASK_SCHEDULERS,
+            tezDefaultClassIdentifier),
+        TezConfiguration.TEZ_AM_TASK_SCHEDULERS);
+
+    String[] containerLauncherClassIdentifiers = parsePlugins(containerLaunchers,
+        conf.getTrimmedStrings(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHERS,
+            tezDefaultClassIdentifier),
+        TezConfiguration.TEZ_AM_CONTAINER_LAUNCHERS);
+
+    String[] taskCommunicatorClassIdentifiers = parsePlugins(taskCommunicators,
+        conf.getTrimmedStrings(TezConfiguration.TEZ_AM_TASK_COMMUNICATORS,
+            tezDefaultClassIdentifier),
+        TezConfiguration.TEZ_AM_TASK_COMMUNICATORS);
+
+    LOG.info(buildPluginComponentLog(taskSchedulerClassIdentifiers, taskSchedulers, "TaskSchedulers"));
+    LOG.info(buildPluginComponentLog(containerLauncherClassIdentifiers, containerLaunchers, "ContainerLaunchers"));
+    LOG.info(buildPluginComponentLog(taskCommunicatorClassIdentifiers, taskCommunicators, "TaskCommunicators"));
+
     boolean disableVersionCheck = conf.getBoolean(
         TezConfiguration.TEZ_AM_DISABLE_CLIENT_VERSION_CHECK,
         TezConfiguration.TEZ_AM_DISABLE_CLIENT_VERSION_CHECK_DEFAULT);
@@ -435,7 +465,7 @@ public class DAGAppMaster extends AbstractService {
 
     //service to handle requests to TaskUmbilicalProtocol
     taskAttemptListener = createTaskAttemptListener(context,
-        taskHeartbeatHandler, containerHeartbeatHandler);
+        taskHeartbeatHandler, containerHeartbeatHandler, taskCommunicatorClassIdentifiers);
     addIfService(taskAttemptListener, true);
 
     containerSignatureMatcher = createContainerSignatureMatcher();
@@ -482,7 +512,8 @@ public class DAGAppMaster extends AbstractService {
     }
 
     this.taskSchedulerEventHandler = new TaskSchedulerEventHandler(context,
-        clientRpcServer, dispatcher.getEventHandler(), containerSignatureMatcher, webUIService);
+        clientRpcServer, dispatcher.getEventHandler(), containerSignatureMatcher, webUIService,
+        taskSchedulerClassIdentifiers);
     addIfService(taskSchedulerEventHandler, true);
 
     if (enableWebUIService()) {
@@ -500,7 +531,7 @@ public class DAGAppMaster extends AbstractService {
         taskSchedulerEventHandler);
     addIfServiceDependency(taskSchedulerEventHandler, clientRpcServer);
 
-    this.containerLauncherRouter = createContainerLauncherRouter(conf);
+    this.containerLauncherRouter = createContainerLauncherRouter(conf, containerLauncherClassIdentifiers);
     addIfService(containerLauncherRouter, true);
     dispatcher.register(NMCommunicatorEventType.class, containerLauncherRouter);
 
@@ -1007,9 +1038,9 @@ public class DAGAppMaster extends AbstractService {
   }
 
   protected TaskAttemptListener createTaskAttemptListener(AppContext context,
-      TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh) {
+      TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh, String[] taskCommunicatorClasses) {
     TaskAttemptListener lis =
-        new TaskAttemptListenerImpTezDag(context, thh, chh,jobTokenSecretManager);
+        new TaskAttemptListenerImpTezDag(context, thh, chh,jobTokenSecretManager, taskCommunicatorClasses);
     return lis;
   }
 
@@ -1030,9 +1061,9 @@ public class DAGAppMaster extends AbstractService {
     return chh;
   }
 
-  protected ContainerLauncherRouter createContainerLauncherRouter(Configuration conf) throws
+  protected ContainerLauncherRouter createContainerLauncherRouter(Configuration conf, String []containerLauncherClasses) throws
       UnknownHostException {
-    return  new ContainerLauncherRouter(conf, isLocal, context, taskAttemptListener, workingDirectory);
+    return  new ContainerLauncherRouter(conf, context, taskAttemptListener, workingDirectory, containerLauncherClasses);
 
   }
 
@@ -1459,6 +1490,21 @@ public class DAGAppMaster extends AbstractService {
     }
 
     @Override
+    public Integer getTaskCommunicatorIdentifier(String name) {
+      return taskCommunicators.get(name);
+    }
+
+    @Override
+    public Integer getTaskScheduerIdentifier(String name) {
+      return taskSchedulers.get(name);
+    }
+
+    @Override
+    public Integer getContainerLauncherIdentifier(String name) {
+      return taskCommunicators.get(name);
+    }
+
+    @Override
     public Map<ApplicationAccessType, String> getApplicationACLs() {
       if (getServiceState() != STATE.STARTED) {
         throw new TezUncheckedException(
@@ -2233,4 +2279,63 @@ public class DAGAppMaster extends AbstractService {
     return amConf.getBoolean(TezConfiguration.TEZ_AM_WEBSERVICE_ENABLE,
         TezConfiguration.TEZ_AM_WEBSERVICE_ENABLE_DEFAULT);
   }
+
+  // Tez default classnames are populated as TezConfiguration.TEZ_AM_SERVICE_PLUGINS_DEFAULT
+  private String[] parsePlugins(BiMap<String, Integer> pluginMap, String[] pluginStrings,
+                                   String context) {
+    Preconditions.checkState(pluginStrings != null && pluginStrings.length > 0,
+        "Plugin strings should not be null or empty: " + context);
+
+    String[] classNames = new String[pluginStrings.length];
+
+    int index = 0;
+    for (String pluginString : pluginStrings) {
+
+      String className;
+      String identifierString;
+
+      Preconditions.checkState(pluginString != null && !pluginString.isEmpty(),
+          "Plugin string: " + pluginString + " should not be null or empty");
+      if (pluginString.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT) ||
+          pluginString.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) {
+        // Kind of ugly, but Tez internal routing is encoded via a String instead of classnames.
+        // Individual components - TaskComm, Scheduler, Launcher deal with actual classname translation,
+        // and avoid reflection.
+        identifierString = pluginString;
+        className = pluginString;
+      } else {
+        String[] parts = pluginString.split(":");
+        Preconditions.checkState(
+            parts.length == 2 && parts[0] != null && !parts[0].isEmpty() && parts[1] != null &&
+                !parts[1].isEmpty(),
+            "Invalid configuration string for " + context + ": " + pluginString);
+        Preconditions.checkState(
+            !parts[0].equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT) &&
+                !parts[0].equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT),
+            "Identifier cannot be " + TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT + " or " +
+                TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT + " for " +
+                pluginString);
+        identifierString = parts[0];
+        className = parts[1];
+      }
+      pluginMap.put(identifierString, index);
+      classNames[index] = className;
+    }
+    return classNames;
+  }
+
+  String buildPluginComponentLog(String[] classIdentifiers, BiMap<String, Integer> map,
+                                 String component) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("AM Level configured ").append(component).append(": ");
+    for (int i = 0; i < classIdentifiers.length; i++) {
+      sb.append("[").append(i).append(":").append(map.inverse().get(i)).append(":")
+          .append(taskSchedulers.inverse().get(i)).append(
+          "]");
+      if (i != classIdentifiers.length - 1) {
+        sb.append(",");
+      }
+    }
+    return sb.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/cee48099/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index 0d9dc31..2f6dcf5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -38,7 +38,6 @@ import org.apache.tez.runtime.api.impl.EventType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -47,7 +46,7 @@ import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.dag.api.TaskCommunicator;
 import org.apache.tez.dag.api.TaskCommunicatorContext;
 import org.apache.tez.dag.api.TaskHeartbeatResponse;
-import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -57,7 +56,6 @@ import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
 import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
-import org.apache.tez.dag.app.rm.TaskSchedulerService;
 import org.apache.tez.dag.app.rm.container.AMContainerTask;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezVertexID;
@@ -74,7 +72,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
       .getLogger(TaskAttemptListenerImpTezDag.class);
 
   private final AppContext context;
-  private TaskCommunicator taskCommunicator;
+  private final TaskCommunicator[] taskCommunicators;
 
   protected final TaskHeartbeatHandler taskHeartbeatHandler;
   protected final ContainerHeartbeatHandler containerHeartbeatHandler;
@@ -100,28 +98,52 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
   public TaskAttemptListenerImpTezDag(AppContext context,
                                       TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh,
                                       // TODO TEZ-2003 pre-merge. Remove reference to JobTokenSecretManager.
-                                      JobTokenSecretManager jobTokenSecretManager) {
+                                      JobTokenSecretManager jobTokenSecretManager,
+                                      String [] taskCommunicatorClassIdentifiers) {
     super(TaskAttemptListenerImpTezDag.class.getName());
     this.context = context;
     this.taskHeartbeatHandler = thh;
     this.containerHeartbeatHandler = chh;
-    this.taskCommunicator = new TezTaskCommunicatorImpl(this);
+    if (taskCommunicatorClassIdentifiers == null || taskCommunicatorClassIdentifiers.length == 0) {
+      taskCommunicatorClassIdentifiers = new String[] {TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT};
+    }
+    this.taskCommunicators = new TaskCommunicator[taskCommunicatorClassIdentifiers.length];
+    for (int i = 0 ; i < taskCommunicatorClassIdentifiers.length ; i++) {
+      taskCommunicators[i] = createTaskCommunicator(taskCommunicatorClassIdentifiers[i]);
+    }
+    // TODO TEZ-2118 Start using taskCommunicator indices properly
+  }
+
+  @Override
+  public void serviceStart() {
+    // TODO Why is init tied to serviceStart
+    for (int i = 0 ; i < taskCommunicators.length ; i++) {
+      taskCommunicators[i].init(getConfig());
+      taskCommunicators[i].start();
+    }
   }
 
   @Override
-  public void serviceInit(Configuration conf) {
-    String taskCommClassName = conf.get(TezConfiguration.TEZ_AM_TASK_COMMUNICATOR_CLASS);
-    if (taskCommClassName == null) {
+  public void serviceStop() {
+    for (int i = 0 ; i < taskCommunicators.length ; i++) {
+      taskCommunicators[i].stop();
+    }
+  }
+
+  private TaskCommunicator createTaskCommunicator(String taskCommClassIdentifier) {
+    if (taskCommClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT) ||
+        taskCommClassIdentifier
+            .equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) {
       LOG.info("Using Default Task Communicator");
-      this.taskCommunicator = new TezTaskCommunicatorImpl(this);
+      return new TezTaskCommunicatorImpl(this);
     } else {
-      LOG.info("Using TaskCommunicator: " + taskCommClassName);
+      LOG.info("Using TaskCommunicator: " + taskCommClassIdentifier);
       Class<? extends TaskCommunicator> taskCommClazz = (Class<? extends TaskCommunicator>) ReflectionUtils
-          .getClazz(taskCommClassName);
+          .getClazz(taskCommClassIdentifier);
       try {
         Constructor<? extends TaskCommunicator> ctor = taskCommClazz.getConstructor(TaskCommunicatorContext.class);
         ctor.setAccessible(true);
-        this.taskCommunicator = ctor.newInstance(this);
+        return ctor.newInstance(this);
       } catch (NoSuchMethodException e) {
         throw new TezUncheckedException(e);
       } catch (InvocationTargetException e) {
@@ -135,20 +157,6 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
   }
 
   @Override
-  public void serviceStart() {
-    taskCommunicator.init(getConfig());
-    taskCommunicator.start();
-  }
-
-  @Override
-  public void serviceStop() {
-    if (taskCommunicator != null) {
-      taskCommunicator.stop();
-      taskCommunicator = null;
-    }
-  }
-
-  @Override
   public ApplicationAttemptId getApplicationAttemptId() {
     return context.getApplicationAttemptId();
   }
@@ -236,7 +244,8 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
 
   @Override
   public void taskStartedRemotely(TezTaskAttemptID taskAttemptID, ContainerId containerId) {
-    context.getEventHandler().handle(new TaskAttemptEventStartedRemotely(taskAttemptID, containerId, null));
+    context.getEventHandler()
+        .handle(new TaskAttemptEventStartedRemotely(taskAttemptID, containerId, null));
     pingContainerHeartbeatHandler(containerId);
   }
 
@@ -266,7 +275,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
 
   @Override
   public InetSocketAddress getAddress() {
-    return taskCommunicator.getAddress();
+    return taskCommunicators[0].getAddress();
   }
 
   // The TaskAttemptListener register / unregister methods in this class are not thread safe.
@@ -298,7 +307,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
           "Multiple registrations for containerId: " + containerId);
     }
     NodeId nodeId = context.getAllContainers().get(containerId).getContainer().getNodeId();
-    taskCommunicator.registerRunningContainer(containerId, nodeId.getHost(), nodeId.getPort());
+    taskCommunicators[0].registerRunningContainer(containerId, nodeId.getHost(), nodeId.getPort());
   }
 
   @Override
@@ -310,7 +319,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     if (containerInfo.taskAttemptId != null) {
       registeredAttempts.remove(containerInfo.taskAttemptId);
     }
-    taskCommunicator.registerContainerEnd(containerId);
+    taskCommunicators[0].registerContainerEnd(containerId);
   }
 
   @Override
@@ -345,7 +354,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
           + amContainerTask.getTask().getTaskAttemptID() + " to container: " + containerId
           + " when already assigned to: " + containerIdFromMap);
     }
-    taskCommunicator.registerRunningTaskAttempt(containerId, amContainerTask.getTask(),
+    taskCommunicators[0].registerRunningTaskAttempt(containerId, amContainerTask.getTask(),
         amContainerTask.getAdditionalResources(), amContainerTask.getCredentials(),
         amContainerTask.haveCredentialsChanged());
   }
@@ -365,7 +374,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     }
     // Explicitly putting in a new entry so that synchronization is not required on the existing element in the map.
     registeredContainers.put(containerId, NULL_CONTAINER_INFO);
-    taskCommunicator.unregisterRunningTaskAttempt(attemptId);
+    taskCommunicators[0].unregisterRunningTaskAttempt(attemptId);
   }
 
   private void pingContainerHeartbeatHandler(ContainerId containerId) {
@@ -383,6 +392,6 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
   }
 
   public TaskCommunicator getTaskCommunicator() {
-    return taskCommunicator;
+    return taskCommunicators[0];
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/cee48099/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index bb42392..8b60dc3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -173,4 +173,8 @@ public interface Vertex extends Comparable<Vertex> {
   public int getKilledTaskAttemptCount();
 
   public Configuration getConf();
+
+  public int getTaskSchedulerIdentifier();
+  public int getContainerLauncherIdentifier();
+  public int getTaskCommunicatorIdentifier();
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/cee48099/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index b1c0acc..c18dc00 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -1066,9 +1066,15 @@ public class TaskAttemptImpl implements TaskAttempt,
         priority = (scheduleEvent.getPriorityHighLimit() + scheduleEvent.getPriorityLowLimit()) / 2;
       }
 
+      // TODO Jira post TEZ-2003 getVertex implementation is very inefficient. This should be via references, instead of locked table lookups.
+      Vertex vertex = ta.getVertex();
       AMSchedulerEventTALaunchRequest launchRequestEvent = new AMSchedulerEventTALaunchRequest(
           ta.attemptId, ta.taskResource, remoteTaskSpec, ta, locationHint,
-          priority, ta.containerContext);
+          priority, ta.containerContext,
+          vertex.getTaskSchedulerIdentifier(),
+          vertex.getContainerLauncherIdentifier(),
+          vertex.getTaskCommunicatorIdentifier());
+
       ta.sendEvent(launchRequestEvent);
       return TaskAttemptStateInternal.START_WAIT;
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/cee48099/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 6b208b0..097cf3d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -73,6 +73,7 @@ import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.RootInputLeafOutput;
 import org.apache.tez.dag.api.Scope;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.TaskLocationHint;
@@ -230,6 +231,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
   @VisibleForTesting
   public boolean useOnDemandRouting = true;
 
+  private final int taskSchedulerIdentifier;
+  private final int containerLauncherIdentifier;
+  private final int taskCommunicatorIdentifier;
+
   //fields initialized in init
 
   @VisibleForTesting
@@ -959,6 +964,33 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
     // This "this leak" is okay because the retained pointer is in an
     //  instance variable.
 
+    boolean isLocal = vertexConf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE,
+        TezConfiguration.TEZ_LOCAL_MODE_DEFAULT);
+
+    String tezDefaultComponentName =
+        isLocal ? TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT :
+            TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT;
+    String taskSchedulerName =
+        vertexConf.get(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, tezDefaultComponentName);
+    String taskCommName = vertexConf
+        .get(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME, tezDefaultComponentName);
+    String containerLauncherName = vertexConf
+        .get(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, tezDefaultComponentName);
+    taskSchedulerIdentifier = appContext.getTaskScheduerIdentifier(taskSchedulerName);
+    taskCommunicatorIdentifier = appContext.getTaskCommunicatorIdentifier(taskCommName);
+    containerLauncherIdentifier = appContext.getContainerLauncherIdentifier(containerLauncherName);
+
+    Preconditions.checkNotNull(taskSchedulerIdentifier, "Unknown taskScheduler: " + taskSchedulerName);
+    Preconditions.checkNotNull(taskCommunicatorIdentifier, "Unknown taskCommunicator: " + containerLauncherName);
+    Preconditions.checkNotNull(containerLauncherIdentifier, "Unknown containerLauncher: " + taskCommName);
+
+    StringBuilder sb = new StringBuilder();
+    sb.append("Running vertex: ").append(logIdentifier).append(" : ")
+        .append("TaskScheduler=").append(taskSchedulerIdentifier).append(":").append(taskSchedulerName)
+        .append(", ContainerLauncher=").append(containerLauncherIdentifier).append(":").append(containerLauncherName)
+        .append(", TaskCommunicator=").append(taskCommunicatorIdentifier).append(":").append(taskCommName);
+    LOG.info(sb.toString());
+
     stateMachine = new StateMachineTez<VertexState, VertexEventType, VertexEvent, VertexImpl>(
         stateMachineFactory.make(this), this);
     augmentStateMachine();
@@ -969,6 +1001,21 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
     return vertexConf;
   }
 
+  @Override
+  public int getTaskSchedulerIdentifier() {
+    return this.taskSchedulerIdentifier;
+  }
+
+  @Override
+  public int getContainerLauncherIdentifier() {
+    return this.containerLauncherIdentifier;
+  }
+
+  @Override
+  public int getTaskCommunicatorIdentifier() {
+    return this.taskCommunicatorIdentifier;
+  }
+
   private boolean isSpeculationEnabled() {
     return isSpeculationEnabled;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/cee48099/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
index 34001ed..621e4a8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.TaskAttemptListener;
@@ -36,73 +37,93 @@ public class ContainerLauncherRouter extends AbstractService
 
   static final Log LOG = LogFactory.getLog(ContainerLauncherImpl.class);
 
-  private final ContainerLauncher containerLauncher;
+  private final ContainerLauncher containerLaunchers[];
 
   @VisibleForTesting
   public ContainerLauncherRouter(ContainerLauncher containerLauncher) {
     super(ContainerLauncherRouter.class.getName());
-    this.containerLauncher = containerLauncher;
+    containerLaunchers = new ContainerLauncher[] {containerLauncher};
   }
 
   // Accepting conf to setup final parameters, if required.
-  public ContainerLauncherRouter(Configuration conf, boolean isLocal, AppContext context,
+  public ContainerLauncherRouter(Configuration conf, AppContext context,
                                  TaskAttemptListener taskAttemptListener,
-                                 String workingDirectory) throws UnknownHostException {
+                                 String workingDirectory,
+                                 String[] containerLauncherClassIdentifiers) throws UnknownHostException {
     super(ContainerLauncherRouter.class.getName());
 
-    if (isLocal) {
+    if (containerLauncherClassIdentifiers == null || containerLauncherClassIdentifiers.length == 0) {
+      containerLauncherClassIdentifiers = new String[] {TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT};
+    }
+    containerLaunchers = new ContainerLauncher[containerLauncherClassIdentifiers.length];
+
+    for (int i = 0; i < containerLauncherClassIdentifiers.length; i++) {
+      containerLaunchers[i] = createContainerLauncher(containerLauncherClassIdentifiers[i], context,
+          taskAttemptListener, workingDirectory, conf);
+    }
+  }
+
+  private ContainerLauncher createContainerLauncher(String containerLauncherClassIdentifier,
+                                                    AppContext context,
+                                                    TaskAttemptListener taskAttemptListener,
+                                                    String workingDirectory,
+                                                    Configuration conf) throws
+      UnknownHostException {
+    if (containerLauncherClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) {
+      LOG.info("Creating DefaultContainerLauncher");
+      return new ContainerLauncherImpl(context);
+    } else if (containerLauncherClassIdentifier
+        .equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) {
       LOG.info("Creating LocalContainerLauncher");
-      containerLauncher =
+      return
           new LocalContainerLauncher(context, taskAttemptListener, workingDirectory);
     } else {
-      // TODO: Temporary reflection with specific parameters until a clean interface is defined.
-      String containerLauncherClassName =
-          conf.get(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHER_CLASS);
-      if (containerLauncherClassName == null) {
-        LOG.info("Creating Default Container Launcher");
-        containerLauncher = new ContainerLauncherImpl(context);
-      } else {
-        LOG.info("Creating container launcher : " + containerLauncherClassName);
-        Class<? extends ContainerLauncher> containerLauncherClazz =
-            (Class<? extends ContainerLauncher>) ReflectionUtils.getClazz(
-                containerLauncherClassName);
-        try {
-          Constructor<? extends ContainerLauncher> ctor = containerLauncherClazz
-              .getConstructor(AppContext.class, Configuration.class, TaskAttemptListener.class);
-          ctor.setAccessible(true);
-          containerLauncher = ctor.newInstance(context, conf, taskAttemptListener);
-        } catch (NoSuchMethodException e) {
-          throw new TezUncheckedException(e);
-        } catch (InvocationTargetException e) {
-          throw new TezUncheckedException(e);
-        } catch (InstantiationException e) {
-          throw new TezUncheckedException(e);
-        } catch (IllegalAccessException e) {
-          throw new TezUncheckedException(e);
-        }
+      LOG.info("Creating container launcher : " + containerLauncherClassIdentifier);
+      Class<? extends ContainerLauncher> containerLauncherClazz =
+          (Class<? extends ContainerLauncher>) ReflectionUtils.getClazz(
+              containerLauncherClassIdentifier);
+      try {
+        Constructor<? extends ContainerLauncher> ctor = containerLauncherClazz
+            .getConstructor(AppContext.class, Configuration.class, TaskAttemptListener.class);
+        ctor.setAccessible(true);
+        return ctor.newInstance(context, conf, taskAttemptListener);
+      } catch (NoSuchMethodException e) {
+        throw new TezUncheckedException(e);
+      } catch (InvocationTargetException e) {
+        throw new TezUncheckedException(e);
+      } catch (InstantiationException e) {
+        throw new TezUncheckedException(e);
+      } catch (IllegalAccessException e) {
+        throw new TezUncheckedException(e);
       }
-
     }
+    // TODO TEZ-2118 Handle routing to multiple launchers
   }
 
   @Override
   public void serviceInit(Configuration conf) {
-    ((AbstractService)containerLauncher).init(conf);
+    for (int i = 0 ; i < containerLaunchers.length ; i++) {
+      ((AbstractService) containerLaunchers[i]).init(conf);
+    }
   }
 
   @Override
   public void serviceStart() {
-    ((AbstractService)containerLauncher).start();
+    for (int i = 0 ; i < containerLaunchers.length ; i++) {
+      ((AbstractService) containerLaunchers[i]).start();
+    }
   }
 
   @Override
   public void serviceStop() {
-    ((AbstractService)containerLauncher).stop();
+    for (int i = 0 ; i < containerLaunchers.length ; i++) {
+      ((AbstractService) containerLaunchers[i]).stop();
+    }
   }
 
 
   @Override
   public void handle(NMCommunicatorEvent event) {
-    containerLauncher.handle(event);
+    containerLaunchers[0].handle(event);
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/cee48099/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
index 5c4d43c..c59193c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
@@ -38,11 +38,16 @@ public class AMSchedulerEventTALaunchRequest extends AMSchedulerEvent {
   private final TaskSpec remoteTaskSpec;
   private final TaskAttempt taskAttempt;
 
+  private final int schedulerId;
+  private final int launcherId;
+  private final int taskCommId;
+
   public AMSchedulerEventTALaunchRequest(TezTaskAttemptID attemptId,
       Resource capability,
       TaskSpec remoteTaskSpec, TaskAttempt ta,
       TaskLocationHint locationHint, int priority,
-      ContainerContext containerContext) {
+      ContainerContext containerContext,
+      int schedulerId, int launcherId, int taskCommId) {
     super(AMSchedulerEventType.S_TA_LAUNCH_REQUEST);
     this.attemptId = attemptId;
     this.capability = capability;
@@ -51,6 +56,9 @@ public class AMSchedulerEventTALaunchRequest extends AMSchedulerEvent {
     this.locationHint = locationHint;
     this.priority = priority;
     this.containerContext = containerContext;
+    this.schedulerId = schedulerId;
+    this.launcherId = launcherId;
+    this.taskCommId = taskCommId;
   }
 
   public TezTaskAttemptID getAttemptID() {
@@ -81,6 +89,18 @@ public class AMSchedulerEventTALaunchRequest extends AMSchedulerEvent {
     return this.containerContext;
   }
 
+  public int getSchedulerId() {
+    return schedulerId;
+  }
+
+  public int getLauncherId() {
+    return launcherId;
+  }
+
+  public int getTaskCommId() {
+    return taskCommId;
+  }
+
   // Parameter replacement: @taskid@ will not be usable
   // ProfileTaskRange not available along with ContainerReUse
 

http://git-wip-us.apache.org/repos/asf/tez/blob/cee48099/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 8c3ed87..72389e7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.TaskLocationHint;
 import org.apache.tez.dag.api.TaskLocationHint.TaskBasedLocationAffinity;
@@ -92,7 +93,6 @@ public class TaskSchedulerEventHandler extends AbstractService
   @SuppressWarnings("rawtypes")
   private final EventHandler eventHandler;
   private final String historyUrl;
-  protected TaskSchedulerService taskScheduler;
   private DAGAppMaster dagAppMaster;
   private Map<ApplicationAccessType, String> appAcls = null;
   private Thread eventHandlingThread;
@@ -105,14 +105,27 @@ public class TaskSchedulerEventHandler extends AbstractService
   private AtomicBoolean shouldUnregisterFlag =
       new AtomicBoolean(false);
   private final WebUIService webUI;
+  private final String[] taskSchedulerClasses;
+  protected final TaskSchedulerService []taskSchedulers;
 
   BlockingQueue<AMSchedulerEvent> eventQueue
                               = new LinkedBlockingQueue<AMSchedulerEvent>();
 
+  /**
+   *
+   * @param appContext
+   * @param clientService
+   * @param eventHandler
+   * @param containerSignatureMatcher
+   * @param webUI
+   * @param schedulerClasses the list of scheduler classes / codes. Tez internal classes are represented as codes.
+   *                         An empty list defaults to using the YarnTaskScheduler as the only source.
+   */
   @SuppressWarnings("rawtypes")
   public TaskSchedulerEventHandler(AppContext appContext,
       DAGClientServer clientService, EventHandler eventHandler, 
-      ContainerSignatureMatcher containerSignatureMatcher, WebUIService webUI) {
+      ContainerSignatureMatcher containerSignatureMatcher, WebUIService webUI,
+      String [] schedulerClasses) {
     super(TaskSchedulerEventHandler.class.getName());
     this.appContext = appContext;
     this.eventHandler = eventHandler;
@@ -123,6 +136,12 @@ public class TaskSchedulerEventHandler extends AbstractService
     if (this.webUI != null) {
       this.webUI.setHistoryUrl(this.historyUrl);
     }
+    if (schedulerClasses == null || schedulerClasses.length == 0) {
+      this.taskSchedulerClasses = new String[] {TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT};
+    } else {
+      this.taskSchedulerClasses = schedulerClasses;
+    }
+    taskSchedulers = new TaskSchedulerService[this.taskSchedulerClasses.length];
   }
 
   public Map<ApplicationAccessType, String> getApplicationAcls() {
@@ -139,11 +158,11 @@ public class TaskSchedulerEventHandler extends AbstractService
   }
   
   public Resource getAvailableResources() {
-    return taskScheduler.getAvailableResources();
+    return taskSchedulers[0].getAvailableResources();
   }
 
   public Resource getTotalResources() {
-    return taskScheduler.getTotalResources();
+    return taskSchedulers[0].getTotalResources();
   }
 
   public synchronized void handleEvent(AMSchedulerEvent sEvent) {
@@ -209,9 +228,9 @@ public class TaskSchedulerEventHandler extends AbstractService
 
   private void handleNodeBlacklistUpdate(AMSchedulerEventNodeBlacklistUpdate event) {
     if (event.getType() == AMSchedulerEventType.S_NODE_BLACKLISTED) {
-      taskScheduler.blacklistNode(event.getNodeId());
+      taskSchedulers[0].blacklistNode(event.getNodeId());
     } else if (event.getType() == AMSchedulerEventType.S_NODE_UNBLACKLISTED) {
-      taskScheduler.unblacklistNode(event.getNodeId());
+      taskSchedulers[0].unblacklistNode(event.getNodeId());
     } else {
       throw new TezUncheckedException("Invalid event type: " + event.getType());
     }
@@ -223,14 +242,14 @@ public class TaskSchedulerEventHandler extends AbstractService
     // TODO what happens to the task that was connected to this container?
     // current assumption is that it will eventually call handleTaStopRequest
     //TaskAttempt taskAttempt = (TaskAttempt)
-    taskScheduler.deallocateContainer(containerId);
+    taskSchedulers[0].deallocateContainer(containerId);
     // TODO does this container need to be stopped via C_STOP_REQUEST
     sendEvent(new AMContainerEventStopRequest(containerId));
   }
 
   private void handleTAUnsuccessfulEnd(AMSchedulerEventTAEnded event) {
     TaskAttempt attempt = event.getAttempt();
-    boolean wasContainerAllocated = taskScheduler.deallocateTask(attempt, false);
+    boolean wasContainerAllocated = taskSchedulers[0].deallocateTask(attempt, false);
     // use stored value of container id in case the scheduler has removed this
     // assignment because the task has been deallocated earlier.
     // retroactive case
@@ -272,7 +291,7 @@ public class TaskSchedulerEventHandler extends AbstractService
           event.getAttemptID()));
     }
 
-    boolean wasContainerAllocated = taskScheduler.deallocateTask(attempt, true);
+    boolean wasContainerAllocated = taskSchedulers[0].deallocateTask(attempt, true);
     if (!wasContainerAllocated) {
       LOG.error("De-allocated successful task: " + attempt.getID()
           + ", but TaskScheduler reported no container assigned to task");
@@ -297,7 +316,7 @@ public class TaskSchedulerEventHandler extends AbstractService
         TaskAttempt affinityAttempt = vertex.getTask(taskIndex).getSuccessfulAttempt();
         if (affinityAttempt != null) {
           Preconditions.checkNotNull(affinityAttempt.getAssignedContainerID(), affinityAttempt.getID());
-          taskScheduler.allocateTask(taskAttempt,
+          taskSchedulers[0].allocateTask(taskAttempt,
               event.getCapability(),
               affinityAttempt.getAssignedContainerID(),
               Priority.newInstance(event.getPriority()),
@@ -316,57 +335,59 @@ public class TaskSchedulerEventHandler extends AbstractService
             .toArray(new String[locationHint.getRacks().size()]) : null;
       }
     }
-    
-    taskScheduler.allocateTask(taskAttempt,
-                               event.getCapability(),
-                               hosts,
-                               racks,
-                               Priority.newInstance(event.getPriority()),
-                               event.getContainerContext(),
-                               event);
-  }
-
-
-  protected TaskSchedulerService createTaskScheduler(String host, int port,
-      String trackingUrl, AppContext appContext) {
-    boolean isLocal = getConfig().getBoolean(TezConfiguration.TEZ_LOCAL_MODE,
-        TezConfiguration.TEZ_LOCAL_MODE_DEFAULT);
-    if (isLocal) {
-      LOG.info("Using TaskScheduler: LocalTaskSchedulerService");
+
+    taskSchedulers[0].allocateTask(taskAttempt,
+        event.getCapability(),
+        hosts,
+        racks,
+        Priority.newInstance(event.getPriority()),
+        event.getContainerContext(),
+        event);
+  }
+
+  private TaskSchedulerService createTaskScheduler(String host, int port, String trackingUrl,
+                                                   AppContext appContext,
+                                                   String schedulerClassName) {
+    if (schedulerClassName.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) {
+      LOG.info("Creating TaskScheduler: YarnTaskSchedulerService");
+      return new YarnTaskSchedulerService(this, this.containerSignatureMatcher,
+          host, port, trackingUrl, appContext);
+    } else if (schedulerClassName.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) {
+      LOG.info("Creating TaskScheduler: Local TaskScheduler");
       return new LocalTaskSchedulerService(this, this.containerSignatureMatcher,
           host, port, trackingUrl, appContext);
-    }
-    else {
-      String schedulerClassName = getConfig().get(TezConfiguration.TEZ_AM_TASK_SCHEDULER_CLASS);
-      if (schedulerClassName == null) {
-        LOG.info("Using TaskScheduler: YarnTaskSchedulerService");
-        return new YarnTaskSchedulerService(this, this.containerSignatureMatcher,
-            host, port, trackingUrl, appContext);
-      } else {
-        LOG.info("Using custom TaskScheduler: " + schedulerClassName);
-        // TODO Temporary reflection with specific parameters. Remove once there is a clean interface.
-        Class<? extends TaskSchedulerService> taskSchedulerClazz =
-            (Class<? extends TaskSchedulerService>) ReflectionUtils.getClazz(schedulerClassName);
-        try {
-          Constructor<? extends TaskSchedulerService> ctor = taskSchedulerClazz
-              .getConstructor(TaskSchedulerAppCallback.class, AppContext.class, String.class,
-                  int.class, String.class, Configuration.class);
-          ctor.setAccessible(true);
-          TaskSchedulerService taskSchedulerService =
-              ctor.newInstance(this, appContext, host, port, trackingUrl, getConfig());
-          return taskSchedulerService;
-        } catch (NoSuchMethodException e) {
-          throw new TezUncheckedException(e);
-        } catch (InvocationTargetException e) {
-          throw new TezUncheckedException(e);
-        } catch (InstantiationException e) {
-          throw new TezUncheckedException(e);
-        } catch (IllegalAccessException e) {
-          throw new TezUncheckedException(e);
-        }
+    } else {
+      LOG.info("Creating custom TaskScheduler: " + schedulerClassName);
+      // TODO TEZ-2003 Temporary reflection with specific parameters. Remove once there is a clean interface.
+      Class<? extends TaskSchedulerService> taskSchedulerClazz =
+          (Class<? extends TaskSchedulerService>) ReflectionUtils.getClazz(schedulerClassName);
+      try {
+        Constructor<? extends TaskSchedulerService> ctor = taskSchedulerClazz
+            .getConstructor(TaskSchedulerAppCallback.class, AppContext.class, String.class,
+                int.class, String.class, Configuration.class);
+        ctor.setAccessible(true);
+        return ctor.newInstance(this, appContext, host, port, trackingUrl, getConfig());
+      } catch (NoSuchMethodException e) {
+        throw new TezUncheckedException(e);
+      } catch (InvocationTargetException e) {
+        throw new TezUncheckedException(e);
+      } catch (InstantiationException e) {
+        throw new TezUncheckedException(e);
+      } catch (IllegalAccessException e) {
+        throw new TezUncheckedException(e);
       }
     }
   }
+
+  @VisibleForTesting
+  protected void instantiateScheduelrs(String host, int port, String trackingUrl, AppContext appContext) {
+    // Iterate over the list and create all the taskSchedulers
+    for (int i = 0; i < taskSchedulerClasses.length; i++) {
+      taskSchedulers[i] = createTaskScheduler(host, port,
+          trackingUrl, appContext, taskSchedulerClasses[i]);
+    }
+  }
+
   
   @Override
   public synchronized void serviceStart() {
@@ -377,13 +398,17 @@ public class TaskSchedulerEventHandler extends AbstractService
     // always try to connect to AM and proxy the response. hence it wont work if the webUIService
     // is not enabled.
     String trackingUrl = (webUI != null) ? webUI.getTrackingURL() : "";
-    taskScheduler = createTaskScheduler(serviceAddr.getHostName(),
-        serviceAddr.getPort(), trackingUrl, appContext);
-    taskScheduler.init(getConfig());
-    taskScheduler.start();
+    instantiateScheduelrs(serviceAddr.getHostName(), serviceAddr.getPort(), trackingUrl, appContext);
+
+    for (int i = 0 ; i < taskSchedulers.length ; i++) {
+      taskSchedulers[i].init(getConfig());
+      taskSchedulers[i].start();
+    }
+
+    // TODO TEZ-2118 Start using multiple task schedulers
     if (shouldUnregisterFlag.get()) {
       // Flag may have been set earlier when task scheduler was not initialized
-      taskScheduler.setShouldUnregister();
+      taskSchedulers[0].setShouldUnregister();
     }
 
     this.eventHandlingThread = new Thread("TaskSchedulerEventHandlerThread") {
@@ -432,8 +457,8 @@ public class TaskSchedulerEventHandler extends AbstractService
       if (eventHandlingThread != null)
         eventHandlingThread.interrupt();
     }
-    if (taskScheduler != null) {
-      ((AbstractService)taskScheduler).stop();
+    if (taskSchedulers[0] != null) {
+      ((AbstractService)taskSchedulers[0]).stop();
     }
   }
 
@@ -578,7 +603,7 @@ public class TaskSchedulerEventHandler extends AbstractService
   public float getProgress() {
     // at this point allocate has been called and so node count must be available
     // may change after YARN-1722
-    int nodeCount = taskScheduler.getClusterNodeCount();
+    int nodeCount = taskSchedulers[0].getClusterNodeCount();
     if (nodeCount != cachedNodeCount) {
       cachedNodeCount = nodeCount;
       sendEvent(new AMNodeEventNodeCountUpdated(cachedNodeCount));
@@ -593,7 +618,7 @@ public class TaskSchedulerEventHandler extends AbstractService
   }
 
   public void dagCompleted() {
-    taskScheduler.dagComplete();
+    taskSchedulers[0].dagComplete();
   }
 
   public void dagSubmitted() {
@@ -603,7 +628,7 @@ public class TaskSchedulerEventHandler extends AbstractService
 
   @Override
   public void preemptContainer(ContainerId containerId) {
-    taskScheduler.deallocateContainer(containerId);
+    taskSchedulers[0].deallocateContainer(containerId);
     // Inform the Containers about completion.
     sendEvent(new AMContainerEventCompleted(containerId, ContainerExitStatus.INVALID,
         "Container preempted internally", TaskAttemptTerminationCause.INTERNAL_PREEMPTION));
@@ -612,13 +637,13 @@ public class TaskSchedulerEventHandler extends AbstractService
   public void setShouldUnregisterFlag() {
     LOG.info("TaskScheduler notified that it should unregister from RM");
     this.shouldUnregisterFlag.set(true);
-    if (this.taskScheduler != null) {
-      this.taskScheduler.setShouldUnregister();
+    if (this.taskSchedulers[0] != null) {
+      this.taskSchedulers[0].setShouldUnregister();
     }
   }
 
   public boolean hasUnregistered() {
-    return this.taskScheduler.hasUnregistered();
+    return this.taskSchedulers[0].hasUnregistered();
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/tez/blob/cee48099/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index 7274cde..aace92b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -507,10 +507,11 @@ public class MockDAGAppMaster extends DAGAppMaster {
     this.handlerConcurrency = handlerConcurrency;
     this.numConcurrentContainers = numConcurrentContainers;
   }
-  
+
   // use mock container launcher for tests
   @Override
-  protected ContainerLauncherRouter createContainerLauncherRouter(final Configuration conf)
+  protected ContainerLauncherRouter createContainerLauncherRouter(final Configuration conf,
+                                                                  String[] containerLaunchers)
       throws UnknownHostException {
     return new ContainerLauncherRouter(containerLauncher);
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/cee48099/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
index 46c412e..33f4817 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
@@ -32,6 +32,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -109,18 +110,9 @@ public class TestTaskAttemptListenerImplTezDag {
     doReturn(dag).when(appContext).getCurrentDAG();
     doReturn(appAcls).when(appContext).getApplicationACLs();
     doReturn(amContainerMap).when(appContext).getAllContainers();
-    NodeId nodeId = NodeId.newInstance("localhost", 0);
-    AMContainer amContainer = mock(AMContainer.class);
-    Container container = mock(Container.class);
-    doReturn(nodeId).when(container).getNodeId();
-    doReturn(amContainer).when(amContainerMap).get(any(ContainerId.class));
-    doReturn(container).when(amContainer).getContainer();
-
-    taskAttemptListener =
-        new TaskAttemptListenerImpTezDag(appContext, mock(TaskHeartbeatHandler.class),
-            mock(ContainerHeartbeatHandler.class), null);
-    TezTaskCommunicatorImpl taskCommunicator = (TezTaskCommunicatorImpl)taskAttemptListener.getTaskCommunicator();
-    TezTaskUmbilicalProtocol tezUmbilical = taskCommunicator.getUmbilical();
+
+    taskAttemptListener = new TaskAttemptListenerImplForTest(appContext,
+        mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null);
 
     taskSpec = mock(TaskSpec.class);
     doReturn(taskAttemptID).when(taskSpec).getTaskAttemptID();
@@ -131,6 +123,9 @@ public class TestTaskAttemptListenerImplTezDag {
   @Test(timeout = 5000)
   public void testGetTask() throws IOException {
 
+    TezTaskCommunicatorImpl taskCommunicator = (TezTaskCommunicatorImpl)taskAttemptListener.getTaskCommunicator();
+    TezTaskUmbilicalProtocol tezUmbilical = taskCommunicator.getUmbilical();
+
     ContainerId containerId1 = createContainerId(appId, 1);
     ContainerContext containerContext1 = new ContainerContext(containerId1.toString());
     containerTask = tezUmbilical.getTask(containerContext1);

http://git-wip-us.apache.org/repos/asf/tez/blob/cee48099/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index 89b77a7..54b9adb 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -1322,7 +1322,7 @@ public class TestContainerReuse {
           InputDescriptor.create("inputClassName"), 1)),
       Collections.singletonList(new OutputSpec("vertexName",
           OutputDescriptor.create("outputClassName"), 1)), null), ta, locationHint,
-      priority.getPriority(), containerContext);
+      priority.getPriority(), containerContext, 0, 0, 0);
     return lr;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/cee48099/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
index af3e40d..291e786 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
@@ -89,13 +89,13 @@ public class TestTaskSchedulerEventHandler {
     public MockTaskSchedulerEventHandler(AppContext appContext,
         DAGClientServer clientService, EventHandler eventHandler,
         ContainerSignatureMatcher containerSignatureMatcher, WebUIService webUI) {
-      super(appContext, clientService, eventHandler, containerSignatureMatcher, webUI);
+      super(appContext, clientService, eventHandler, containerSignatureMatcher, webUI, new String[] {});
     }
-    
+
     @Override
-    protected TaskSchedulerService createTaskScheduler(String host, int port,
-        String trackingUrl, AppContext appContext) {
-      return mockTaskScheduler;
+    protected void instantiateScheduelrs(String host, int port, String trackingUrl,
+                                         AppContext appContext) {
+      taskSchedulers[0] = mockTaskScheduler;
     }
     
     @Override
@@ -194,7 +194,7 @@ public class TestTaskSchedulerEventHandler {
     when(mockAppContext.getCurrentDAG().getVertex(affVertexName)).thenReturn(affVertex);
     Resource resource = Resource.newInstance(100, 1);
     AMSchedulerEventTALaunchRequest event = new AMSchedulerEventTALaunchRequest
-        (taId, resource, null, mockTaskAttempt, locHint, 3, null);
+        (taId, resource, null, mockTaskAttempt, locHint, 3, null, 0, 0, 0);
     schedulerHandler.notify.set(false);
     schedulerHandler.handle(event);
     synchronized (schedulerHandler.notify) {

http://git-wip-us.apache.org/repos/asf/tez/blob/cee48099/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
index 77c98b7..d775300 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
@@ -127,31 +127,29 @@ class TestTaskSchedulerHelpers {
         EventHandler eventHandler,
         TezAMRMClientAsync<CookieContainerRequest> amrmClientAsync,
         ContainerSignatureMatcher containerSignatureMatcher) {
-      super(appContext, null, eventHandler, containerSignatureMatcher, null);
+      super(appContext, null, eventHandler, containerSignatureMatcher, null, new String[]{});
       this.amrmClientAsync = amrmClientAsync;
       this.containerSignatureMatcher = containerSignatureMatcher;
     }
 
     @Override
-    public TaskSchedulerService createTaskScheduler(String host, int port,
-        String trackingUrl, AppContext appContext) {
-      return new TaskSchedulerWithDrainableAppCallback(this,
+    public void instantiateScheduelrs(String host, int port, String trackingUrl, AppContext appContext) {
+      taskSchedulers[0] = new TaskSchedulerWithDrainableAppCallback(this,
           containerSignatureMatcher, host, port, trackingUrl, amrmClientAsync,
           appContext);
     }
 
     public TaskSchedulerService getSpyTaskScheduler() {
-      return this.taskScheduler;
+      return taskSchedulers[0];
     }
 
     @Override
     public void serviceStart() {
-      TaskSchedulerService taskSchedulerReal = createTaskScheduler("host", 0, "",
-        appContext);
+      instantiateScheduelrs("host", 0, "", appContext);
       // Init the service so that reuse configuration is picked up.
-      ((AbstractService)taskSchedulerReal).init(getConfig());
-      ((AbstractService)taskSchedulerReal).start();
-      taskScheduler = spy(taskSchedulerReal);
+      ((AbstractService)taskSchedulers[0]).init(getConfig());
+      ((AbstractService)taskSchedulers[0]).start();
+      taskSchedulers[0] = spy(taskSchedulers[0]);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/cee48099/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
index a93c1a4..ae7e7f8 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
@@ -45,6 +45,8 @@ public class TestExternalTezServices {
 
   private static final Log LOG = LogFactory.getLog(TestExternalTezServices.class);
 
+  private static final String EXT_PUSH_ENTITY_NAME = "ExtServiceTestPush";
+
   private static MiniTezCluster tezCluster;
   private static MiniDFSCluster dfsCluster;
   private static MiniTezTestServiceCluster tezTestServiceCluster;
@@ -106,12 +108,17 @@ public class TestExternalTezServices {
     remoteFs.mkdirs(stagingDirPath);
     // This is currently configured to push tasks into the Service, and then use the standard RPC
     confForJobs.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString());
-    confForJobs.set(TezConfiguration.TEZ_AM_TASK_SCHEDULER_CLASS,
-        TezTestServiceTaskSchedulerService.class.getName());
-    confForJobs.set(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHER_CLASS,
-        TezTestServiceNoOpContainerLauncher.class.getName());
-    confForJobs.set(TezConfiguration.TEZ_AM_TASK_COMMUNICATOR_CLASS,
-        TezTestServiceTaskCommunicatorImpl.class.getName());
+    confForJobs.set(TezConfiguration.TEZ_AM_TASK_SCHEDULERS,
+        EXT_PUSH_ENTITY_NAME + ":" + TezTestServiceTaskSchedulerService.class.getName());
+    confForJobs.set(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHERS,
+        EXT_PUSH_ENTITY_NAME + ":" + TezTestServiceNoOpContainerLauncher.class.getName());
+    confForJobs.set(TezConfiguration.TEZ_AM_TASK_COMMUNICATORS,
+        EXT_PUSH_ENTITY_NAME + ":" + TezTestServiceTaskCommunicatorImpl.class.getName());
+
+    confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, EXT_PUSH_ENTITY_NAME);
+    confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, EXT_PUSH_ENTITY_NAME);
+    confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME, EXT_PUSH_ENTITY_NAME);
+
 
     TezConfiguration tezConf = new TezConfiguration(confForJobs);