You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/02/19 23:59:36 UTC
tez git commit: TEZ-2122. Setup pluggable components at AM/Vertex
level. (sseth)
Repository: tez
Updated Branches:
refs/heads/TEZ-2003 960f65fab -> da2088b58
TEZ-2122. Setup pluggable components at AM/Vertex level. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/da2088b5
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/da2088b5
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/da2088b5
Branch: refs/heads/TEZ-2003
Commit: da2088b587d83623aa3998dc3b54d78d44c79c5c
Parents: 960f65f
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Feb 19 14:59:18 2015 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Feb 19 14:59:18 2015 -0800
----------------------------------------------------------------------
TEZ-2003-CHANGES.txt | 1 +
.../apache/tez/dag/api/TezConfiguration.java | 29 +++-
.../org/apache/tez/dag/api/TezConstants.java | 3 +
.../java/org/apache/tez/dag/app/AppContext.java | 4 +
.../org/apache/tez/dag/app/DAGAppMaster.java | 122 +++++++++++++-
.../dag/app/TaskAttemptListenerImpTezDag.java | 77 +++++----
.../java/org/apache/tez/dag/app/dag/Vertex.java | 4 +
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 8 +-
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 47 ++++++
.../app/launcher/ContainerLauncherRouter.java | 93 +++++++----
.../app/rm/AMSchedulerEventTALaunchRequest.java | 22 ++-
.../dag/app/rm/TaskSchedulerEventHandler.java | 163 +++++++++++--------
.../apache/tez/dag/app/MockDAGAppMaster.java | 5 +-
.../app/TestTaskAttemptListenerImplTezDag.java | 13 +-
.../tez/dag/app/rm/TestContainerReuse.java | 2 +-
.../app/rm/TestTaskSchedulerEventHandler.java | 12 +-
.../dag/app/rm/TestTaskSchedulerHelpers.java | 18 +-
.../tez/tests/TestExternalTezServices.java | 19 ++-
18 files changed, 462 insertions(+), 180 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/da2088b5/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index 1cd74a4..4bfe08f 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -3,5 +3,6 @@ ALL CHANGES:
TEZ-2006. Task communication plane needs to be pluggable.
TEZ-2090. Add tests for jobs running in external services.
TEZ-2117. Add a manager for ContainerLaunchers running in the AM.
+ TEZ-2122. Setup pluggable components at AM/Vertex level.
INCOMPATIBLE CHANGES:
http://git-wip-us.apache.org/repos/asf/tez/blob/da2088b5/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 6b0a6f8..e0bcd6e 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -1136,13 +1136,36 @@ public class TezConfiguration extends Configuration {
+ "tez-ui.webservice.enable";
public static final boolean TEZ_AM_WEBSERVICE_ENABLE_DEFAULT = true;
+ /** defaults container-launcher for the specific vertex */
@ConfigurationScope(Scope.VERTEX)
- public static final String TEZ_AM_CONTAINER_LAUNCHER_CLASS = TEZ_AM_PREFIX + "container-launcher.class";
+ public static final String TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME = TEZ_AM_PREFIX + "vertex.container-launcher.name";
+ /** defaults task-scheduler for the specific vertex */
@ConfigurationScope(Scope.VERTEX)
- public static final String TEZ_AM_TASK_SCHEDULER_CLASS = TEZ_AM_PREFIX + "task-scheduler.class";
+ public static final String TEZ_AM_VERTEX_TASK_SCHEDULER_NAME = TEZ_AM_PREFIX + "vertex.task-scheduler.name";
+ /** defaults task-communicator for the specific vertex */
@ConfigurationScope(Scope.VERTEX)
- public static final String TEZ_AM_TASK_COMMUNICATOR_CLASS = TEZ_AM_PREFIX + "task-communicator.class";
+ public static final String TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME = TEZ_AM_PREFIX + "vertex.task-communicator.name";
+ /** Comma separated list of named container-launcher classes running in the AM.
+ * The format for each entry is NAME:CLASSNAME, except for tez default which is specified as Tez
+ * e.g. Tez, ExtService:org.apache.ExtLauncherClasss
+ * */
+ @ConfigurationScope(Scope.AM)
+ public static final String TEZ_AM_CONTAINER_LAUNCHERS = TEZ_AM_PREFIX + "container-launchers";
+
+ /** Comma separated list of task-schedulers classes running in the AM.
+ * The format for each entry is NAME:CLASSNAME, except for tez default which is specified as Tez
+ * e.g. Tez, ExtService:org.apache.ExtSchedulerClasss
+ */
+ @ConfigurationScope(Scope.AM)
+ public static final String TEZ_AM_TASK_SCHEDULERS = TEZ_AM_PREFIX + "task-schedulers";
+
+ /** Comma separated list of task-communicators classes running in the AM.
+ * The format for each entry is NAME:CLASSNAME, except for tez default which is specified as Tez
+ * e.g. Tez, ExtService:org.apache.ExtTaskCommClass
+ * */
+ @ConfigurationScope(Scope.AM)
+ public static final String TEZ_AM_TASK_COMMUNICATORS = TEZ_AM_PREFIX + "task-communicators";
// TODO only validate property here, value can also be validated if necessary
public static void validateProperty(String property, Scope usedScope) {
http://git-wip-us.apache.org/repos/asf/tez/blob/da2088b5/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
index bc4208f..3b07c59 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
@@ -102,4 +102,7 @@ public class TezConstants {
/// Version-related Environment variables
public static final String TEZ_CLIENT_VERSION_ENV = "TEZ_CLIENT_VERSION";
+
+ public static final String TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT = "Tez";
+ public static final String TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT = "TezLocal";
}
http://git-wip-us.apache.org/repos/asf/tez/blob/da2088b5/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
index f22b17a..1f2aaf1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
@@ -105,4 +105,8 @@ public interface AppContext {
String getAMUser();
Credentials getAppCredentials();
+
+ public Integer getTaskCommunicatorIdentifier(String name);
+ public Integer getTaskScheduerIdentifier(String name);
+ public Integer getContainerLauncherIdentifier(String name);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/da2088b5/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index b315d81..3b9561c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -56,6 +56,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.Options;
@@ -107,7 +109,6 @@ import org.apache.tez.dag.api.client.DAGClientHandler;
import org.apache.tez.dag.api.client.DAGClientServer;
import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
-import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourcesProto;
import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
import org.apache.tez.dag.app.RecoveryParser.RecoveredDAGData;
@@ -260,7 +261,12 @@ public class DAGAppMaster extends AbstractService {
private ExecutorService rawExecutor;
private ListeningExecutorService execService;
-
+
+ // TODO May not need to be a bidi map
+ private final BiMap<String, Integer> taskSchedulers = HashBiMap.create();
+ private final BiMap<String, Integer> containerLaunchers = HashBiMap.create();
+ private final BiMap<String, Integer> taskCommunicators = HashBiMap.create();
+
/**
* set of already executed dag names.
*/
@@ -331,6 +337,29 @@ public class DAGAppMaster extends AbstractService {
this.isLocal = conf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE,
TezConfiguration.TEZ_LOCAL_MODE_DEFAULT);
+ String tezDefaultClassIdentifier =
+ isLocal ? TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT :
+ TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT;
+
+ String[] taskSchedulerClassIdentifiers = parsePlugins(taskSchedulers,
+ conf.getTrimmedStrings(TezConfiguration.TEZ_AM_TASK_SCHEDULERS,
+ tezDefaultClassIdentifier),
+ TezConfiguration.TEZ_AM_TASK_SCHEDULERS);
+
+ String[] containerLauncherClassIdentifiers = parsePlugins(containerLaunchers,
+ conf.getTrimmedStrings(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHERS,
+ tezDefaultClassIdentifier),
+ TezConfiguration.TEZ_AM_CONTAINER_LAUNCHERS);
+
+ String[] taskCommunicatorClassIdentifiers = parsePlugins(taskCommunicators,
+ conf.getTrimmedStrings(TezConfiguration.TEZ_AM_TASK_COMMUNICATORS,
+ tezDefaultClassIdentifier),
+ TezConfiguration.TEZ_AM_TASK_COMMUNICATORS);
+
+ LOG.info(buildPluginComponentLog(taskSchedulerClassIdentifiers, taskSchedulers, "TaskSchedulers"));
+ LOG.info(buildPluginComponentLog(containerLauncherClassIdentifiers, containerLaunchers, "ContainerLaunchers"));
+ LOG.info(buildPluginComponentLog(taskCommunicatorClassIdentifiers, taskCommunicators, "TaskCommunicators"));
+
boolean disableVersionCheck = conf.getBoolean(
TezConfiguration.TEZ_AM_DISABLE_CLIENT_VERSION_CHECK,
TezConfiguration.TEZ_AM_DISABLE_CLIENT_VERSION_CHECK_DEFAULT);
@@ -395,7 +424,7 @@ public class DAGAppMaster extends AbstractService {
//service to handle requests to TaskUmbilicalProtocol
taskAttemptListener = createTaskAttemptListener(context,
- taskHeartbeatHandler, containerHeartbeatHandler);
+ taskHeartbeatHandler, containerHeartbeatHandler, taskCommunicatorClassIdentifiers);
addIfService(taskAttemptListener, true);
containerSignatureMatcher = createContainerSignatureMatcher();
@@ -432,7 +461,8 @@ public class DAGAppMaster extends AbstractService {
}
this.taskSchedulerEventHandler = new TaskSchedulerEventHandler(context,
- clientRpcServer, dispatcher.getEventHandler(), containerSignatureMatcher, webUIService);
+ clientRpcServer, dispatcher.getEventHandler(), containerSignatureMatcher, webUIService,
+ taskSchedulerClassIdentifiers);
addIfService(taskSchedulerEventHandler, true);
if (enableWebUIService()) {
@@ -450,7 +480,7 @@ public class DAGAppMaster extends AbstractService {
taskSchedulerEventHandler);
addIfServiceDependency(taskSchedulerEventHandler, clientRpcServer);
- this.containerLauncherRouter = createContainerLauncherRouter(conf);
+ this.containerLauncherRouter = createContainerLauncherRouter(conf, containerLauncherClassIdentifiers);
addIfService(containerLauncherRouter, true);
dispatcher.register(NMCommunicatorEventType.class, containerLauncherRouter);
@@ -910,9 +940,9 @@ public class DAGAppMaster extends AbstractService {
}
protected TaskAttemptListener createTaskAttemptListener(AppContext context,
- TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh) {
+ TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh, String[] taskCommunicatorClasses) {
TaskAttemptListener lis =
- new TaskAttemptListenerImpTezDag(context, thh, chh,jobTokenSecretManager);
+ new TaskAttemptListenerImpTezDag(context, thh, chh,jobTokenSecretManager, taskCommunicatorClasses);
return lis;
}
@@ -933,9 +963,9 @@ public class DAGAppMaster extends AbstractService {
return chh;
}
- protected ContainerLauncherRouter createContainerLauncherRouter(Configuration conf) throws
+ protected ContainerLauncherRouter createContainerLauncherRouter(Configuration conf, String []containerLauncherClasses) throws
UnknownHostException {
- return new ContainerLauncherRouter(conf, isLocal, context, taskAttemptListener, workingDirectory);
+ return new ContainerLauncherRouter(conf, context, taskAttemptListener, workingDirectory, containerLauncherClasses);
}
@@ -1357,6 +1387,21 @@ public class DAGAppMaster extends AbstractService {
}
@Override
+ public Integer getTaskCommunicatorIdentifier(String name) {
+ return taskCommunicators.get(name);
+ }
+
+ @Override
+ public Integer getTaskScheduerIdentifier(String name) {
+ return taskSchedulers.get(name);
+ }
+
+ @Override
+ public Integer getContainerLauncherIdentifier(String name) {
+ return taskCommunicators.get(name);
+ }
+
+ @Override
public Map<ApplicationAccessType, String> getApplicationACLs() {
if (getServiceState() != STATE.STARTED) {
throw new TezUncheckedException(
@@ -2090,4 +2135,63 @@ public class DAGAppMaster extends AbstractService {
return amConf.getBoolean(TezConfiguration.TEZ_AM_WEBSERVICE_ENABLE,
TezConfiguration.TEZ_AM_WEBSERVICE_ENABLE_DEFAULT);
}
+
+ // Tez default classnames are populated as TezConfiguration.TEZ_AM_SERVICE_PLUGINS_DEFAULT
+ private String[] parsePlugins(BiMap<String, Integer> pluginMap, String[] pluginStrings,
+ String context) {
+ Preconditions.checkState(pluginStrings != null && pluginStrings.length > 0,
+ "Plugin strings should not be null or empty: " + context);
+
+ String[] classNames = new String[pluginStrings.length];
+
+ int index = 0;
+ for (String pluginString : pluginStrings) {
+
+ String className;
+ String identifierString;
+
+ Preconditions.checkState(pluginString != null && !pluginString.isEmpty(),
+ "Plugin string: " + pluginString + " should not be null or empty");
+ if (pluginString.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT) ||
+ pluginString.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) {
+ // Kind of ugly, but Tez internal routing is encoded via a String instead of classnames.
+ // Individual components - TaskComm, Scheduler, Launcher deal with actual classname translation,
+ // and avoid reflection.
+ identifierString = pluginString;
+ className = pluginString;
+ } else {
+ String[] parts = pluginString.split(":");
+ Preconditions.checkState(
+ parts.length == 2 && parts[0] != null && !parts[0].isEmpty() && parts[1] != null &&
+ !parts[1].isEmpty(),
+ "Invalid configuration string for " + context + ": " + pluginString);
+ Preconditions.checkState(
+ !parts[0].equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT) &&
+ !parts[0].equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT),
+ "Identifier cannot be " + TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT + " or " +
+ TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT + " for " +
+ pluginString);
+ identifierString = parts[0];
+ className = parts[1];
+ }
+ pluginMap.put(identifierString, index);
+ classNames[index] = className;
+ }
+ return classNames;
+ }
+
+ String buildPluginComponentLog(String[] classIdentifiers, BiMap<String, Integer> map,
+ String component) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("AM Level configured ").append(component).append(": ");
+ for (int i = 0; i < classIdentifiers.length; i++) {
+ sb.append("[").append(i).append(":").append(map.inverse().get(i)).append(":")
+ .append(taskSchedulers.inverse().get(i)).append(
+ "]");
+ if (i != classIdentifiers.length - 1) {
+ sb.append(",");
+ }
+ }
+ return sb.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/da2088b5/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index 08b50ba..886a2ad 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -28,7 +28,6 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -37,7 +36,7 @@ import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.dag.api.TaskCommunicator;
import org.apache.tez.dag.api.TaskCommunicatorContext;
import org.apache.tez.dag.api.TaskHeartbeatResponse;
-import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -47,7 +46,6 @@ import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
-import org.apache.tez.dag.app.rm.TaskSchedulerService;
import org.apache.tez.dag.app.rm.container.AMContainerTask;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezVertexID;
@@ -64,7 +62,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
.getLog(TaskAttemptListenerImpTezDag.class);
private final AppContext context;
- private TaskCommunicator taskCommunicator;
+ private final TaskCommunicator[] taskCommunicators;
protected final TaskHeartbeatHandler taskHeartbeatHandler;
protected final ContainerHeartbeatHandler containerHeartbeatHandler;
@@ -90,28 +88,52 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
public TaskAttemptListenerImpTezDag(AppContext context,
TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh,
// TODO TEZ-2003 pre-merge. Remove reference to JobTokenSecretManager.
- JobTokenSecretManager jobTokenSecretManager) {
+ JobTokenSecretManager jobTokenSecretManager,
+ String [] taskCommunicatorClassIdentifiers) {
super(TaskAttemptListenerImpTezDag.class.getName());
this.context = context;
this.taskHeartbeatHandler = thh;
this.containerHeartbeatHandler = chh;
- this.taskCommunicator = new TezTaskCommunicatorImpl(this);
+ if (taskCommunicatorClassIdentifiers == null || taskCommunicatorClassIdentifiers.length == 0) {
+ taskCommunicatorClassIdentifiers = new String[] {TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT};
+ }
+ this.taskCommunicators = new TaskCommunicator[taskCommunicatorClassIdentifiers.length];
+ for (int i = 0 ; i < taskCommunicatorClassIdentifiers.length ; i++) {
+ taskCommunicators[i] = createTaskCommunicator(taskCommunicatorClassIdentifiers[i]);
+ }
+ // TODO TEZ-2118 Start using taskCommunicator indices properly
+ }
+
+ @Override
+ public void serviceStart() {
+ // TODO Why is init tied to serviceStart
+ for (int i = 0 ; i < taskCommunicators.length ; i++) {
+ taskCommunicators[i].init(getConfig());
+ taskCommunicators[i].start();
+ }
}
@Override
- public void serviceInit(Configuration conf) {
- String taskCommClassName = conf.get(TezConfiguration.TEZ_AM_TASK_COMMUNICATOR_CLASS);
- if (taskCommClassName == null) {
+ public void serviceStop() {
+ for (int i = 0 ; i < taskCommunicators.length ; i++) {
+ taskCommunicators[i].stop();
+ }
+ }
+
+ private TaskCommunicator createTaskCommunicator(String taskCommClassIdentifier) {
+ if (taskCommClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT) ||
+ taskCommClassIdentifier
+ .equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) {
LOG.info("Using Default Task Communicator");
- this.taskCommunicator = new TezTaskCommunicatorImpl(this);
+ return new TezTaskCommunicatorImpl(this);
} else {
- LOG.info("Using TaskCommunicator: " + taskCommClassName);
+ LOG.info("Using TaskCommunicator: " + taskCommClassIdentifier);
Class<? extends TaskCommunicator> taskCommClazz = (Class<? extends TaskCommunicator>) ReflectionUtils
- .getClazz(taskCommClassName);
+ .getClazz(taskCommClassIdentifier);
try {
Constructor<? extends TaskCommunicator> ctor = taskCommClazz.getConstructor(TaskCommunicatorContext.class);
ctor.setAccessible(true);
- this.taskCommunicator = ctor.newInstance(this);
+ return ctor.newInstance(this);
} catch (NoSuchMethodException e) {
throw new TezUncheckedException(e);
} catch (InvocationTargetException e) {
@@ -125,20 +147,6 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
}
@Override
- public void serviceStart() {
- taskCommunicator.init(getConfig());
- taskCommunicator.start();
- }
-
- @Override
- public void serviceStop() {
- if (taskCommunicator != null) {
- taskCommunicator.stop();
- taskCommunicator = null;
- }
- }
-
- @Override
public ApplicationAttemptId getApplicationAttemptId() {
return context.getApplicationAttemptId();
}
@@ -214,7 +222,8 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
@Override
public void taskStartedRemotely(TezTaskAttemptID taskAttemptID, ContainerId containerId) {
- context.getEventHandler().handle(new TaskAttemptEventStartedRemotely(taskAttemptID, containerId, null));
+ context.getEventHandler()
+ .handle(new TaskAttemptEventStartedRemotely(taskAttemptID, containerId, null));
pingContainerHeartbeatHandler(containerId);
}
@@ -244,7 +253,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
@Override
public InetSocketAddress getAddress() {
- return taskCommunicator.getAddress();
+ return taskCommunicators[0].getAddress();
}
// The TaskAttemptListener register / unregister methods in this class are not thread safe.
@@ -260,7 +269,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
"Multiple registrations for containerId: " + containerId);
}
NodeId nodeId = context.getAllContainers().get(containerId).getContainer().getNodeId();
- taskCommunicator.registerRunningContainer(containerId, nodeId.getHost(), nodeId.getPort());
+ taskCommunicators[0].registerRunningContainer(containerId, nodeId.getHost(), nodeId.getPort());
}
@Override
@@ -272,7 +281,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
if (containerInfo.taskAttemptId != null) {
registeredAttempts.remove(containerInfo.taskAttemptId);
}
- taskCommunicator.registerContainerEnd(containerId);
+ taskCommunicators[0].registerContainerEnd(containerId);
}
@Override
@@ -307,7 +316,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
+ amContainerTask.getTask().getTaskAttemptID() + " to container: " + containerId
+ " when already assigned to: " + containerIdFromMap);
}
- taskCommunicator.registerRunningTaskAttempt(containerId, amContainerTask.getTask(),
+ taskCommunicators[0].registerRunningTaskAttempt(containerId, amContainerTask.getTask(),
amContainerTask.getAdditionalResources(), amContainerTask.getCredentials(),
amContainerTask.haveCredentialsChanged());
}
@@ -327,7 +336,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
}
// Explicitly putting in a new entry so that synchronization is not required on the existing element in the map.
registeredContainers.put(containerId, NULL_CONTAINER_INFO);
- taskCommunicator.unregisterRunningTaskAttempt(attemptId);
+ taskCommunicators[0].unregisterRunningTaskAttempt(attemptId);
}
private void pingContainerHeartbeatHandler(ContainerId containerId) {
@@ -345,6 +354,6 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
}
public TaskCommunicator getTaskCommunicator() {
- return taskCommunicator;
+ return taskCommunicators[0];
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/da2088b5/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index 291a0c5..d8470a5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -153,4 +153,8 @@ public interface Vertex extends Comparable<Vertex> {
public int getKilledTaskAttemptCount();
public Configuration getConf();
+
+ public int getTaskSchedulerIdentifier();
+ public int getContainerLauncherIdentifier();
+ public int getTaskCommunicatorIdentifier();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/da2088b5/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 4a4506e..836ab0d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -1055,9 +1055,15 @@ public class TaskAttemptImpl implements TaskAttempt,
priority = (scheduleEvent.getPriorityHighLimit() + scheduleEvent.getPriorityLowLimit()) / 2;
}
+ // TODO Jira post TEZ-2003 getVertex implementation is very inefficient. This should be via references, instead of locked table lookups.
+ Vertex vertex = ta.getVertex();
AMSchedulerEventTALaunchRequest launchRequestEvent = new AMSchedulerEventTALaunchRequest(
ta.attemptId, ta.taskResource, remoteTaskSpec, ta, locationHint,
- priority, ta.containerContext);
+ priority, ta.containerContext,
+ vertex.getTaskSchedulerIdentifier(),
+ vertex.getContainerLauncherIdentifier(),
+ vertex.getTaskCommunicatorIdentifier());
+
ta.sendEvent(launchRequestEvent);
return TaskAttemptStateInternal.START_WAIT;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/da2088b5/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 145170c..31430d1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -73,6 +73,7 @@ import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.RootInputLeafOutput;
import org.apache.tez.dag.api.Scope;
import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.TaskLocationHint;
@@ -218,6 +219,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
private final boolean isSpeculationEnabled;
+ private final int taskSchedulerIdentifier;
+ private final int containerLauncherIdentifier;
+ private final int taskCommunicatorIdentifier;
+
//fields initialized in init
@VisibleForTesting
@@ -825,6 +830,33 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
// This "this leak" is okay because the retained pointer is in an
// instance variable.
+ boolean isLocal = vertexConf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE,
+ TezConfiguration.TEZ_LOCAL_MODE_DEFAULT);
+
+ String tezDefaultComponentName =
+ isLocal ? TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT :
+ TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT;
+ String taskSchedulerName =
+ vertexConf.get(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, tezDefaultComponentName);
+ String taskCommName = vertexConf
+ .get(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME, tezDefaultComponentName);
+ String containerLauncherName = vertexConf
+ .get(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, tezDefaultComponentName);
+ taskSchedulerIdentifier = appContext.getTaskScheduerIdentifier(taskSchedulerName);
+ taskCommunicatorIdentifier = appContext.getTaskCommunicatorIdentifier(taskCommName);
+ containerLauncherIdentifier = appContext.getContainerLauncherIdentifier(containerLauncherName);
+
+ Preconditions.checkNotNull(taskSchedulerIdentifier, "Unknown taskScheduler: " + taskSchedulerName);
+ Preconditions.checkNotNull(taskCommunicatorIdentifier, "Unknown taskCommunicator: " + containerLauncherName);
+ Preconditions.checkNotNull(containerLauncherIdentifier, "Unknown containerLauncher: " + taskCommName);
+
+ StringBuilder sb = new StringBuilder();
+ sb.append("Running vertex: ").append(logIdentifier).append(" : ")
+ .append("TaskScheduler=").append(taskSchedulerIdentifier).append(":").append(taskSchedulerName)
+ .append(", ContainerLauncher=").append(containerLauncherIdentifier).append(":").append(containerLauncherName)
+ .append(", TaskCommunicator=").append(taskCommunicatorIdentifier).append(":").append(taskCommName);
+ LOG.info(sb.toString());
+
stateMachine = new StateMachineTez<VertexState, VertexEventType, VertexEvent, VertexImpl>(
stateMachineFactory.make(this), this);
augmentStateMachine();
@@ -835,6 +867,21 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
return vertexConf;
}
+ @Override
+ public int getTaskSchedulerIdentifier() {
+ return this.taskSchedulerIdentifier;
+ }
+
+ @Override
+ public int getContainerLauncherIdentifier() {
+ return this.containerLauncherIdentifier;
+ }
+
+ @Override
+ public int getTaskCommunicatorIdentifier() {
+ return this.taskCommunicatorIdentifier;
+ }
+
private boolean isSpeculationEnabled() {
return isSpeculationEnabled;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/da2088b5/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
index 34001ed..621e4a8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.TaskAttemptListener;
@@ -36,73 +37,93 @@ public class ContainerLauncherRouter extends AbstractService
static final Log LOG = LogFactory.getLog(ContainerLauncherImpl.class);
- private final ContainerLauncher containerLauncher;
+ private final ContainerLauncher containerLaunchers[];
@VisibleForTesting
public ContainerLauncherRouter(ContainerLauncher containerLauncher) {
super(ContainerLauncherRouter.class.getName());
- this.containerLauncher = containerLauncher;
+ containerLaunchers = new ContainerLauncher[] {containerLauncher};
}
// Accepting conf to setup final parameters, if required.
- public ContainerLauncherRouter(Configuration conf, boolean isLocal, AppContext context,
+ public ContainerLauncherRouter(Configuration conf, AppContext context,
TaskAttemptListener taskAttemptListener,
- String workingDirectory) throws UnknownHostException {
+ String workingDirectory,
+ String[] containerLauncherClassIdentifiers) throws UnknownHostException {
super(ContainerLauncherRouter.class.getName());
- if (isLocal) {
+ if (containerLauncherClassIdentifiers == null || containerLauncherClassIdentifiers.length == 0) {
+ containerLauncherClassIdentifiers = new String[] {TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT};
+ }
+ containerLaunchers = new ContainerLauncher[containerLauncherClassIdentifiers.length];
+
+ for (int i = 0; i < containerLauncherClassIdentifiers.length; i++) {
+ containerLaunchers[i] = createContainerLauncher(containerLauncherClassIdentifiers[i], context,
+ taskAttemptListener, workingDirectory, conf);
+ }
+ }
+
+ private ContainerLauncher createContainerLauncher(String containerLauncherClassIdentifier,
+ AppContext context,
+ TaskAttemptListener taskAttemptListener,
+ String workingDirectory,
+ Configuration conf) throws
+ UnknownHostException {
+ if (containerLauncherClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) {
+ LOG.info("Creating DefaultContainerLauncher");
+ return new ContainerLauncherImpl(context);
+ } else if (containerLauncherClassIdentifier
+ .equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) {
LOG.info("Creating LocalContainerLauncher");
- containerLauncher =
+ return
new LocalContainerLauncher(context, taskAttemptListener, workingDirectory);
} else {
- // TODO: Temporary reflection with specific parameters until a clean interface is defined.
- String containerLauncherClassName =
- conf.get(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHER_CLASS);
- if (containerLauncherClassName == null) {
- LOG.info("Creating Default Container Launcher");
- containerLauncher = new ContainerLauncherImpl(context);
- } else {
- LOG.info("Creating container launcher : " + containerLauncherClassName);
- Class<? extends ContainerLauncher> containerLauncherClazz =
- (Class<? extends ContainerLauncher>) ReflectionUtils.getClazz(
- containerLauncherClassName);
- try {
- Constructor<? extends ContainerLauncher> ctor = containerLauncherClazz
- .getConstructor(AppContext.class, Configuration.class, TaskAttemptListener.class);
- ctor.setAccessible(true);
- containerLauncher = ctor.newInstance(context, conf, taskAttemptListener);
- } catch (NoSuchMethodException e) {
- throw new TezUncheckedException(e);
- } catch (InvocationTargetException e) {
- throw new TezUncheckedException(e);
- } catch (InstantiationException e) {
- throw new TezUncheckedException(e);
- } catch (IllegalAccessException e) {
- throw new TezUncheckedException(e);
- }
+ LOG.info("Creating container launcher : " + containerLauncherClassIdentifier);
+ Class<? extends ContainerLauncher> containerLauncherClazz =
+ (Class<? extends ContainerLauncher>) ReflectionUtils.getClazz(
+ containerLauncherClassIdentifier);
+ try {
+ Constructor<? extends ContainerLauncher> ctor = containerLauncherClazz
+ .getConstructor(AppContext.class, Configuration.class, TaskAttemptListener.class);
+ ctor.setAccessible(true);
+ return ctor.newInstance(context, conf, taskAttemptListener);
+ } catch (NoSuchMethodException e) {
+ throw new TezUncheckedException(e);
+ } catch (InvocationTargetException e) {
+ throw new TezUncheckedException(e);
+ } catch (InstantiationException e) {
+ throw new TezUncheckedException(e);
+ } catch (IllegalAccessException e) {
+ throw new TezUncheckedException(e);
}
-
}
+ // TODO TEZ-2118 Handle routing to multiple launchers
}
@Override
public void serviceInit(Configuration conf) {
- ((AbstractService)containerLauncher).init(conf);
+ for (int i = 0 ; i < containerLaunchers.length ; i++) {
+ ((AbstractService) containerLaunchers[i]).init(conf);
+ }
}
@Override
public void serviceStart() {
- ((AbstractService)containerLauncher).start();
+ for (int i = 0 ; i < containerLaunchers.length ; i++) {
+ ((AbstractService) containerLaunchers[i]).start();
+ }
}
@Override
public void serviceStop() {
- ((AbstractService)containerLauncher).stop();
+ for (int i = 0 ; i < containerLaunchers.length ; i++) {
+ ((AbstractService) containerLaunchers[i]).stop();
+ }
}
@Override
public void handle(NMCommunicatorEvent event) {
- containerLauncher.handle(event);
+ containerLaunchers[0].handle(event);
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/da2088b5/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
index 5c4d43c..c59193c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
@@ -38,11 +38,16 @@ public class AMSchedulerEventTALaunchRequest extends AMSchedulerEvent {
private final TaskSpec remoteTaskSpec;
private final TaskAttempt taskAttempt;
+ private final int schedulerId;
+ private final int launcherId;
+ private final int taskCommId;
+
public AMSchedulerEventTALaunchRequest(TezTaskAttemptID attemptId,
Resource capability,
TaskSpec remoteTaskSpec, TaskAttempt ta,
TaskLocationHint locationHint, int priority,
- ContainerContext containerContext) {
+ ContainerContext containerContext,
+ int schedulerId, int launcherId, int taskCommId) {
super(AMSchedulerEventType.S_TA_LAUNCH_REQUEST);
this.attemptId = attemptId;
this.capability = capability;
@@ -51,6 +56,9 @@ public class AMSchedulerEventTALaunchRequest extends AMSchedulerEvent {
this.locationHint = locationHint;
this.priority = priority;
this.containerContext = containerContext;
+ this.schedulerId = schedulerId;
+ this.launcherId = launcherId;
+ this.taskCommId = taskCommId;
}
public TezTaskAttemptID getAttemptID() {
@@ -81,6 +89,18 @@ public class AMSchedulerEventTALaunchRequest extends AMSchedulerEvent {
return this.containerContext;
}
+ public int getSchedulerId() {
+ return schedulerId;
+ }
+
+ public int getLauncherId() {
+ return launcherId;
+ }
+
+ public int getTaskCommId() {
+ return taskCommId;
+ }
+
// Parameter replacement: @taskid@ will not be usable
// ProfileTaskRange not available along with ContainerReUse
http://git-wip-us.apache.org/repos/asf/tez/blob/da2088b5/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index fb17300..95a6ef4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.TaskLocationHint.TaskBasedLocationAffinity;
@@ -92,7 +93,6 @@ public class TaskSchedulerEventHandler extends AbstractService
@SuppressWarnings("rawtypes")
private final EventHandler eventHandler;
private final String historyUrl;
- protected TaskSchedulerService taskScheduler;
private DAGAppMaster dagAppMaster;
private Map<ApplicationAccessType, String> appAcls = null;
private Thread eventHandlingThread;
@@ -105,14 +105,27 @@ public class TaskSchedulerEventHandler extends AbstractService
private AtomicBoolean shouldUnregisterFlag =
new AtomicBoolean(false);
private final WebUIService webUI;
+ private final String[] taskSchedulerClasses;
+ protected final TaskSchedulerService []taskSchedulers;
BlockingQueue<AMSchedulerEvent> eventQueue
= new LinkedBlockingQueue<AMSchedulerEvent>();
+ /**
+ *
+ * @param appContext
+ * @param clientService
+ * @param eventHandler
+ * @param containerSignatureMatcher
+ * @param webUI
+ * @param schedulerClasses the list of scheduler classes / codes. Tez internal classes are represented as codes.
+ * An empty list defaults to using the YarnTaskScheduler as the only source.
+ */
@SuppressWarnings("rawtypes")
public TaskSchedulerEventHandler(AppContext appContext,
DAGClientServer clientService, EventHandler eventHandler,
- ContainerSignatureMatcher containerSignatureMatcher, WebUIService webUI) {
+ ContainerSignatureMatcher containerSignatureMatcher, WebUIService webUI,
+ String [] schedulerClasses) {
super(TaskSchedulerEventHandler.class.getName());
this.appContext = appContext;
this.eventHandler = eventHandler;
@@ -123,6 +136,12 @@ public class TaskSchedulerEventHandler extends AbstractService
if (this.webUI != null) {
this.webUI.setHistoryUrl(this.historyUrl);
}
+ if (schedulerClasses == null || schedulerClasses.length == 0) {
+ this.taskSchedulerClasses = new String[] {TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT};
+ } else {
+ this.taskSchedulerClasses = schedulerClasses;
+ }
+ taskSchedulers = new TaskSchedulerService[this.taskSchedulerClasses.length];
}
public Map<ApplicationAccessType, String> getApplicationAcls() {
@@ -139,11 +158,11 @@ public class TaskSchedulerEventHandler extends AbstractService
}
public Resource getAvailableResources() {
- return taskScheduler.getAvailableResources();
+ return taskSchedulers[0].getAvailableResources();
}
public Resource getTotalResources() {
- return taskScheduler.getTotalResources();
+ return taskSchedulers[0].getTotalResources();
}
public synchronized void handleEvent(AMSchedulerEvent sEvent) {
@@ -209,9 +228,9 @@ public class TaskSchedulerEventHandler extends AbstractService
private void handleNodeBlacklistUpdate(AMSchedulerEventNodeBlacklistUpdate event) {
if (event.getType() == AMSchedulerEventType.S_NODE_BLACKLISTED) {
- taskScheduler.blacklistNode(event.getNodeId());
+ taskSchedulers[0].blacklistNode(event.getNodeId());
} else if (event.getType() == AMSchedulerEventType.S_NODE_UNBLACKLISTED) {
- taskScheduler.unblacklistNode(event.getNodeId());
+ taskSchedulers[0].unblacklistNode(event.getNodeId());
} else {
throw new TezUncheckedException("Invalid event type: " + event.getType());
}
@@ -223,14 +242,14 @@ public class TaskSchedulerEventHandler extends AbstractService
// TODO what happens to the task that was connected to this container?
// current assumption is that it will eventually call handleTaStopRequest
//TaskAttempt taskAttempt = (TaskAttempt)
- taskScheduler.deallocateContainer(containerId);
+ taskSchedulers[0].deallocateContainer(containerId);
// TODO does this container need to be stopped via C_STOP_REQUEST
sendEvent(new AMContainerEventStopRequest(containerId));
}
private void handleTAUnsuccessfulEnd(AMSchedulerEventTAEnded event) {
TaskAttempt attempt = event.getAttempt();
- boolean wasContainerAllocated = taskScheduler.deallocateTask(attempt, false);
+ boolean wasContainerAllocated = taskSchedulers[0].deallocateTask(attempt, false);
// use stored value of container id in case the scheduler has removed this
// assignment because the task has been deallocated earlier.
// retroactive case
@@ -272,7 +291,7 @@ public class TaskSchedulerEventHandler extends AbstractService
event.getAttemptID()));
}
- boolean wasContainerAllocated = taskScheduler.deallocateTask(attempt, true);
+ boolean wasContainerAllocated = taskSchedulers[0].deallocateTask(attempt, true);
if (!wasContainerAllocated) {
LOG.error("De-allocated successful task: " + attempt.getID()
+ ", but TaskScheduler reported no container assigned to task");
@@ -297,7 +316,7 @@ public class TaskSchedulerEventHandler extends AbstractService
TaskAttempt affinityAttempt = vertex.getTask(taskIndex).getSuccessfulAttempt();
if (affinityAttempt != null) {
Preconditions.checkNotNull(affinityAttempt.getAssignedContainerID(), affinityAttempt.getID());
- taskScheduler.allocateTask(taskAttempt,
+ taskSchedulers[0].allocateTask(taskAttempt,
event.getCapability(),
affinityAttempt.getAssignedContainerID(),
Priority.newInstance(event.getPriority()),
@@ -316,57 +335,59 @@ public class TaskSchedulerEventHandler extends AbstractService
.toArray(new String[locationHint.getRacks().size()]) : null;
}
}
-
- taskScheduler.allocateTask(taskAttempt,
- event.getCapability(),
- hosts,
- racks,
- Priority.newInstance(event.getPriority()),
- event.getContainerContext(),
- event);
- }
-
-
- protected TaskSchedulerService createTaskScheduler(String host, int port,
- String trackingUrl, AppContext appContext) {
- boolean isLocal = getConfig().getBoolean(TezConfiguration.TEZ_LOCAL_MODE,
- TezConfiguration.TEZ_LOCAL_MODE_DEFAULT);
- if (isLocal) {
- LOG.info("Using TaskScheduler: LocalTaskSchedulerService");
+
+ taskSchedulers[0].allocateTask(taskAttempt,
+ event.getCapability(),
+ hosts,
+ racks,
+ Priority.newInstance(event.getPriority()),
+ event.getContainerContext(),
+ event);
+ }
+
+ private TaskSchedulerService createTaskScheduler(String host, int port, String trackingUrl,
+ AppContext appContext,
+ String schedulerClassName) {
+ if (schedulerClassName.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) {
+ LOG.info("Creating TaskScheduler: YarnTaskSchedulerService");
+ return new YarnTaskSchedulerService(this, this.containerSignatureMatcher,
+ host, port, trackingUrl, appContext);
+ } else if (schedulerClassName.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) {
+ LOG.info("Creating TaskScheduler: Local TaskScheduler");
return new LocalTaskSchedulerService(this, this.containerSignatureMatcher,
host, port, trackingUrl, appContext);
- }
- else {
- String schedulerClassName = getConfig().get(TezConfiguration.TEZ_AM_TASK_SCHEDULER_CLASS);
- if (schedulerClassName == null) {
- LOG.info("Using TaskScheduler: YarnTaskSchedulerService");
- return new YarnTaskSchedulerService(this, this.containerSignatureMatcher,
- host, port, trackingUrl, appContext);
- } else {
- LOG.info("Using custom TaskScheduler: " + schedulerClassName);
- // TODO Temporary reflection with specific parameters. Remove once there is a clean interface.
- Class<? extends TaskSchedulerService> taskSchedulerClazz =
- (Class<? extends TaskSchedulerService>) ReflectionUtils.getClazz(schedulerClassName);
- try {
- Constructor<? extends TaskSchedulerService> ctor = taskSchedulerClazz
- .getConstructor(TaskSchedulerAppCallback.class, AppContext.class, String.class,
- int.class, String.class, Configuration.class);
- ctor.setAccessible(true);
- TaskSchedulerService taskSchedulerService =
- ctor.newInstance(this, appContext, host, port, trackingUrl, getConfig());
- return taskSchedulerService;
- } catch (NoSuchMethodException e) {
- throw new TezUncheckedException(e);
- } catch (InvocationTargetException e) {
- throw new TezUncheckedException(e);
- } catch (InstantiationException e) {
- throw new TezUncheckedException(e);
- } catch (IllegalAccessException e) {
- throw new TezUncheckedException(e);
- }
+ } else {
+ LOG.info("Creating custom TaskScheduler: " + schedulerClassName);
+ // TODO TEZ-2003 Temporary reflection with specific parameters. Remove once there is a clean interface.
+ Class<? extends TaskSchedulerService> taskSchedulerClazz =
+ (Class<? extends TaskSchedulerService>) ReflectionUtils.getClazz(schedulerClassName);
+ try {
+ Constructor<? extends TaskSchedulerService> ctor = taskSchedulerClazz
+ .getConstructor(TaskSchedulerAppCallback.class, AppContext.class, String.class,
+ int.class, String.class, Configuration.class);
+ ctor.setAccessible(true);
+ return ctor.newInstance(this, appContext, host, port, trackingUrl, getConfig());
+ } catch (NoSuchMethodException e) {
+ throw new TezUncheckedException(e);
+ } catch (InvocationTargetException e) {
+ throw new TezUncheckedException(e);
+ } catch (InstantiationException e) {
+ throw new TezUncheckedException(e);
+ } catch (IllegalAccessException e) {
+ throw new TezUncheckedException(e);
}
}
}
+
+ @VisibleForTesting
+ protected void instantiateScheduelrs(String host, int port, String trackingUrl, AppContext appContext) {
+ // Iterate over the list and create all the taskSchedulers
+ for (int i = 0; i < taskSchedulerClasses.length; i++) {
+ taskSchedulers[i] = createTaskScheduler(host, port,
+ trackingUrl, appContext, taskSchedulerClasses[i]);
+ }
+ }
+
@Override
public synchronized void serviceStart() {
@@ -377,13 +398,17 @@ public class TaskSchedulerEventHandler extends AbstractService
// always try to connect to AM and proxy the response. hence it wont work if the webUIService
// is not enabled.
String trackingUrl = (webUI != null) ? webUI.getURL() : "";
- taskScheduler = createTaskScheduler(serviceAddr.getHostName(),
- serviceAddr.getPort(), trackingUrl, appContext);
- taskScheduler.init(getConfig());
- taskScheduler.start();
+ instantiateScheduelrs(serviceAddr.getHostName(), serviceAddr.getPort(), trackingUrl, appContext);
+
+ for (int i = 0 ; i < taskSchedulers.length ; i++) {
+ taskSchedulers[i].init(getConfig());
+ taskSchedulers[i].start();
+ }
+
+ // TODO TEZ-2118 Start using multiple task schedulers
if (shouldUnregisterFlag.get()) {
// Flag may have been set earlier when task scheduler was not initialized
- taskScheduler.setShouldUnregister();
+ taskSchedulers[0].setShouldUnregister();
}
this.eventHandlingThread = new Thread("TaskSchedulerEventHandlerThread") {
@@ -432,8 +457,8 @@ public class TaskSchedulerEventHandler extends AbstractService
if (eventHandlingThread != null)
eventHandlingThread.interrupt();
}
- if (taskScheduler != null) {
- ((AbstractService)taskScheduler).stop();
+ if (taskSchedulers[0] != null) {
+ ((AbstractService)taskSchedulers[0]).stop();
}
}
@@ -578,7 +603,7 @@ public class TaskSchedulerEventHandler extends AbstractService
public float getProgress() {
// at this point allocate has been called and so node count must be available
// may change after YARN-1722
- int nodeCount = taskScheduler.getClusterNodeCount();
+ int nodeCount = taskSchedulers[0].getClusterNodeCount();
if (nodeCount != cachedNodeCount) {
cachedNodeCount = nodeCount;
sendEvent(new AMNodeEventNodeCountUpdated(cachedNodeCount));
@@ -593,12 +618,12 @@ public class TaskSchedulerEventHandler extends AbstractService
}
public void dagCompleted() {
- taskScheduler.resetMatchLocalityForAllHeldContainers();
+ taskSchedulers[0].resetMatchLocalityForAllHeldContainers();
}
@Override
public void preemptContainer(ContainerId containerId) {
- taskScheduler.deallocateContainer(containerId);
+ taskSchedulers[0].deallocateContainer(containerId);
// Inform the Containers about completion.
sendEvent(new AMContainerEventCompleted(containerId, ContainerExitStatus.INVALID,
"Container preempted internally", TaskAttemptTerminationCause.INTERNAL_PREEMPTION));
@@ -607,13 +632,13 @@ public class TaskSchedulerEventHandler extends AbstractService
public void setShouldUnregisterFlag() {
LOG.info("TaskScheduler notified that it should unregister from RM");
this.shouldUnregisterFlag.set(true);
- if (this.taskScheduler != null) {
- this.taskScheduler.setShouldUnregister();
+ if (this.taskSchedulers[0] != null) {
+ this.taskSchedulers[0].setShouldUnregister();
}
}
public boolean hasUnregistered() {
- return this.taskScheduler.hasUnregistered();
+ return this.taskSchedulers[0].hasUnregistered();
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/tez/blob/da2088b5/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index aacf659..ba0f910 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -344,10 +344,11 @@ public class MockDAGAppMaster extends DAGAppMaster {
this.initFailFlag = initFailFlag;
this.startFailFlag = startFailFlag;
}
-
+
// use mock container launcher for tests
@Override
- protected ContainerLauncherRouter createContainerLauncherRouter(final Configuration conf)
+ protected ContainerLauncherRouter createContainerLauncherRouter(final Configuration conf,
+ String[] containerLaunchers)
throws UnknownHostException {
return new ContainerLauncherRouter(containerLauncher);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/da2088b5/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
index 843f88a..fb831c6 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
@@ -26,6 +26,7 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -50,6 +51,8 @@ public class TestTaskAttemptListenerImplTezDag {
@Test(timeout = 5000)
public void testGetTask() throws IOException {
ApplicationId appId = ApplicationId.newInstance(1000, 1);
+ ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
+ Credentials credentials = new Credentials();
AppContext appContext = mock(AppContext.class);
EventHandler eventHandler = mock(EventHandler.class);
DAG dag = mock(DAG.class);
@@ -57,6 +60,8 @@ public class TestTaskAttemptListenerImplTezDag {
Map<ApplicationAccessType, String> appAcls = new HashMap<ApplicationAccessType, String>();
doReturn(eventHandler).when(appContext).getEventHandler();
doReturn(dag).when(appContext).getCurrentDAG();
+ doReturn(appAttemptId).when(appContext).getApplicationAttemptId();
+ doReturn(credentials).when(appContext).getAppCredentials();
doReturn(appAcls).when(appContext).getApplicationACLs();
doReturn(amContainerMap).when(appContext).getAllContainers();
NodeId nodeId = NodeId.newInstance("localhost", 0);
@@ -68,7 +73,7 @@ public class TestTaskAttemptListenerImplTezDag {
TaskAttemptListenerImpTezDag taskAttemptListener =
new TaskAttemptListenerImpTezDag(appContext, mock(TaskHeartbeatHandler.class),
- mock(ContainerHeartbeatHandler.class), null);
+ mock(ContainerHeartbeatHandler.class), null, null);
TezTaskCommunicatorImpl taskCommunicator = (TezTaskCommunicatorImpl)taskAttemptListener.getTaskCommunicator();
TezTaskUmbilicalProtocol tezUmbilical = taskCommunicator.getUmbilical();
@@ -126,6 +131,8 @@ public class TestTaskAttemptListenerImplTezDag {
@Test(timeout = 5000)
public void testGetTaskMultiplePulls() throws IOException {
ApplicationId appId = ApplicationId.newInstance(1000, 1);
+ ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
+ Credentials credentials = new Credentials();
AppContext appContext = mock(AppContext.class);
EventHandler eventHandler = mock(EventHandler.class);
DAG dag = mock(DAG.class);
@@ -133,8 +140,10 @@ public class TestTaskAttemptListenerImplTezDag {
Map<ApplicationAccessType, String> appAcls = new HashMap<ApplicationAccessType, String>();
doReturn(eventHandler).when(appContext).getEventHandler();
doReturn(dag).when(appContext).getCurrentDAG();
+ doReturn(credentials).when(appContext).getAppCredentials();
doReturn(appAcls).when(appContext).getApplicationACLs();
doReturn(amContainerMap).when(appContext).getAllContainers();
+ doReturn(appAttemptId).when(appContext).getApplicationAttemptId();
NodeId nodeId = NodeId.newInstance("localhost", 0);
AMContainer amContainer = mock(AMContainer.class);
Container container = mock(Container.class);
@@ -144,7 +153,7 @@ public class TestTaskAttemptListenerImplTezDag {
TaskAttemptListenerImpTezDag taskAttemptListener =
new TaskAttemptListenerImpTezDag(appContext, mock(TaskHeartbeatHandler.class),
- mock(ContainerHeartbeatHandler.class), null);
+ mock(ContainerHeartbeatHandler.class), null, null);
TezTaskCommunicatorImpl taskCommunicator = (TezTaskCommunicatorImpl)taskAttemptListener.getTaskCommunicator();
TezTaskUmbilicalProtocol tezUmbilical = taskCommunicator.getUmbilical();
http://git-wip-us.apache.org/repos/asf/tez/blob/da2088b5/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index d0d3df8..a0e6495 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -1124,7 +1124,7 @@ public class TestContainerReuse {
InputDescriptor.create("inputClassName"), 1)),
Collections.singletonList(new OutputSpec("vertexName",
OutputDescriptor.create("outputClassName"), 1)), null), ta, locationHint,
- priority.getPriority(), containerContext);
+ priority.getPriority(), containerContext, 0, 0, 0);
return lr;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/da2088b5/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
index 7bb9d9b..5c09ed1 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
@@ -79,13 +79,13 @@ public class TestTaskSchedulerEventHandler {
public MockTaskSchedulerEventHandler(AppContext appContext,
DAGClientServer clientService, EventHandler eventHandler,
ContainerSignatureMatcher containerSignatureMatcher, WebUIService webUI) {
- super(appContext, clientService, eventHandler, containerSignatureMatcher, webUI);
+ super(appContext, clientService, eventHandler, containerSignatureMatcher, webUI, new String[] {});
}
-
+
@Override
- protected TaskSchedulerService createTaskScheduler(String host, int port,
- String trackingUrl, AppContext appContext) {
- return mockTaskScheduler;
+ protected void instantiateScheduelrs(String host, int port, String trackingUrl,
+ AppContext appContext) {
+ taskSchedulers[0] = mockTaskScheduler;
}
@Override
@@ -145,7 +145,7 @@ public class TestTaskSchedulerEventHandler {
when(mockAppContext.getCurrentDAG().getVertex(affVertexName)).thenReturn(affVertex);
Resource resource = Resource.newInstance(100, 1);
AMSchedulerEventTALaunchRequest event = new AMSchedulerEventTALaunchRequest
- (taId, resource, null, mockTaskAttempt, locHint, 3, null);
+ (taId, resource, null, mockTaskAttempt, locHint, 3, null, 0, 0, 0);
schedulerHandler.notify.set(false);
schedulerHandler.handle(event);
synchronized (schedulerHandler.notify) {
http://git-wip-us.apache.org/repos/asf/tez/blob/da2088b5/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
index bec5320..5d18dae 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
@@ -127,31 +127,29 @@ class TestTaskSchedulerHelpers {
EventHandler eventHandler,
TezAMRMClientAsync<CookieContainerRequest> amrmClientAsync,
ContainerSignatureMatcher containerSignatureMatcher) {
- super(appContext, null, eventHandler, containerSignatureMatcher, null);
+ super(appContext, null, eventHandler, containerSignatureMatcher, null, new String[]{});
this.amrmClientAsync = amrmClientAsync;
this.containerSignatureMatcher = containerSignatureMatcher;
}
@Override
- public TaskSchedulerService createTaskScheduler(String host, int port,
- String trackingUrl, AppContext appContext) {
- return new TaskSchedulerWithDrainableAppCallback(this,
+ public void instantiateScheduelrs(String host, int port, String trackingUrl, AppContext appContext) {
+ taskSchedulers[0] = new TaskSchedulerWithDrainableAppCallback(this,
containerSignatureMatcher, host, port, trackingUrl, amrmClientAsync,
appContext);
}
public TaskSchedulerService getSpyTaskScheduler() {
- return this.taskScheduler;
+ return taskSchedulers[0];
}
@Override
public void serviceStart() {
- TaskSchedulerService taskSchedulerReal = createTaskScheduler("host", 0, "",
- appContext);
+ instantiateScheduelrs("host", 0, "", appContext);
// Init the service so that reuse configuration is picked up.
- ((AbstractService)taskSchedulerReal).init(getConfig());
- ((AbstractService)taskSchedulerReal).start();
- taskScheduler = spy(taskSchedulerReal);
+ ((AbstractService)taskSchedulers[0]).init(getConfig());
+ ((AbstractService)taskSchedulers[0]).start();
+ taskSchedulers[0] = spy(taskSchedulers[0]);
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/da2088b5/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
index a93c1a4..ae7e7f8 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
@@ -45,6 +45,8 @@ public class TestExternalTezServices {
private static final Log LOG = LogFactory.getLog(TestExternalTezServices.class);
+ private static final String EXT_PUSH_ENTITY_NAME = "ExtServiceTestPush";
+
private static MiniTezCluster tezCluster;
private static MiniDFSCluster dfsCluster;
private static MiniTezTestServiceCluster tezTestServiceCluster;
@@ -106,12 +108,17 @@ public class TestExternalTezServices {
remoteFs.mkdirs(stagingDirPath);
// This is currently configured to push tasks into the Service, and then use the standard RPC
confForJobs.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString());
- confForJobs.set(TezConfiguration.TEZ_AM_TASK_SCHEDULER_CLASS,
- TezTestServiceTaskSchedulerService.class.getName());
- confForJobs.set(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHER_CLASS,
- TezTestServiceNoOpContainerLauncher.class.getName());
- confForJobs.set(TezConfiguration.TEZ_AM_TASK_COMMUNICATOR_CLASS,
- TezTestServiceTaskCommunicatorImpl.class.getName());
+ confForJobs.set(TezConfiguration.TEZ_AM_TASK_SCHEDULERS,
+ EXT_PUSH_ENTITY_NAME + ":" + TezTestServiceTaskSchedulerService.class.getName());
+ confForJobs.set(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHERS,
+ EXT_PUSH_ENTITY_NAME + ":" + TezTestServiceNoOpContainerLauncher.class.getName());
+ confForJobs.set(TezConfiguration.TEZ_AM_TASK_COMMUNICATORS,
+ EXT_PUSH_ENTITY_NAME + ":" + TezTestServiceTaskCommunicatorImpl.class.getName());
+
+ confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, EXT_PUSH_ENTITY_NAME);
+ confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, EXT_PUSH_ENTITY_NAME);
+ confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME, EXT_PUSH_ENTITY_NAME);
+
TezConfiguration tezConf = new TezConfiguration(confForJobs);