You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/14 22:58:55 UTC
[37/50] [abbrv] tez git commit: TEZ-2652. Cleanup the way services
are specified for an AM and vertices. (sseth)
TEZ-2652. Cleanup the way services are specified for an AM and vertices. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/cfd625e9
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/cfd625e9
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/cfd625e9
Branch: refs/heads/TEZ-2003
Commit: cfd625e95b4edd836638612fbbf0eac7383be3d2
Parents: fc8a4ce
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue Jul 28 14:56:20 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Aug 14 13:47:09 2015 -0700
----------------------------------------------------------------------
TEZ-2003-CHANGES.txt | 1 +
.../java/org/apache/tez/client/TezClient.java | 81 ++++++++-
.../org/apache/tez/client/TezClientUtils.java | 56 +++++-
.../main/java/org/apache/tez/dag/api/DAG.java | 48 +++++-
.../apache/tez/dag/api/DagTypeConverters.java | 93 +++++++++-
.../tez/dag/api/NamedEntityDescriptor.java | 33 ++++
.../apache/tez/dag/api/TezConfiguration.java | 31 ----
.../org/apache/tez/dag/api/TezConstants.java | 11 +-
.../java/org/apache/tez/dag/api/Vertex.java | 110 +++++++++++-
.../api/ContainerLauncherDescriptor.java | 32 ++++
.../api/ServicePluginsDescriptor.java | 96 +++++++++++
.../api/TaskCommunicatorDescriptor.java | 33 ++++
.../api/TaskSchedulerDescriptor.java | 32 ++++
tez-api/src/main/proto/DAGApiRecords.proto | 25 +++
.../apache/tez/client/TestTezClientUtils.java | 12 +-
.../org/apache/tez/common/TezUtilsInternal.java | 31 +++-
.../java/org/apache/tez/client/LocalClient.java | 2 +-
.../org/apache/tez/dag/app/DAGAppMaster.java | 172 +++++++++++--------
.../dag/app/TaskAttemptListenerImpTezDag.java | 37 ++--
.../java/org/apache/tez/dag/app/dag/DAG.java | 2 +
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 9 +
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 39 ++++-
.../dag/app/launcher/ContainerLauncherImpl.java | 6 +-
.../app/launcher/ContainerLauncherRouter.java | 40 +++--
.../dag/app/rm/TaskSchedulerEventHandler.java | 47 ++---
.../apache/tez/dag/app/MockDAGAppMaster.java | 5 +-
.../app/TestTaskAttemptListenerImplTezDag.java | 5 +-
.../app/rm/TestTaskSchedulerEventHandler.java | 4 +-
.../dag/app/rm/TestTaskSchedulerHelpers.java | 3 +-
.../org/apache/tez/examples/JoinValidate.java | 31 ++--
.../TezTestServiceContainerLauncher.java | 3 +-
.../tez/examples/JoinValidateConfigured.java | 40 +++--
.../tez/tests/TestExternalTezServices.java | 131 +++++++-------
.../org/apache/tez/runtime/task/TezChild.java | 5 +-
34 files changed, 995 insertions(+), 311 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/cfd625e9/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index e57f76f..a201942 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -37,5 +37,6 @@ ALL CHANGES:
TEZ-2004. Define basic interface for pluggable ContainerLaunchers.
TEZ-2005. Define basic interface for pluggable TaskScheduler.
TEZ-2651. Pluggable services should not extend AbstractService.
+ TEZ-2652. Cleanup the way services are specified for an AM and vertices.
INCOMPATIBLE CHANGES:
http://git-wip-us.apache.org/repos/asf/tez/blob/cfd625e9/tez-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index 2590879..8759fdc 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -26,6 +26,7 @@ import java.util.Map;
import javax.annotation.Nullable;
import org.apache.tez.common.RPCUtil;
+import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -111,8 +112,9 @@ public class TezClient {
private static final long SLEEP_FOR_READY = 500;
private JobTokenSecretManager jobTokenSecretManager =
new JobTokenSecretManager();
- private Map<String, LocalResource> additionalLocalResources = Maps.newHashMap();
- private TezApiVersionInfo apiVersionInfo;
+ private final Map<String, LocalResource> additionalLocalResources = Maps.newHashMap();
+ private final TezApiVersionInfo apiVersionInfo;
+ private final ServicePluginsDescriptor servicePluginsDescriptor;
private HistoryACLPolicyManager historyACLPolicyManager;
private int preWarmDAGCounter = 0;
@@ -142,18 +144,44 @@ public class TezClient {
@Private
protected TezClient(String name, TezConfiguration tezConf, boolean isSession,
+ @Nullable Map<String, LocalResource> localResources,
+ @Nullable Credentials credentials) {
+ this(name, tezConf, isSession, localResources, credentials, null);
+ }
+
+ @Private
+ protected TezClient(String name, TezConfiguration tezConf, boolean isSession,
@Nullable Map<String, LocalResource> localResources,
- @Nullable Credentials credentials) {
+ @Nullable Credentials credentials, ServicePluginsDescriptor servicePluginsDescriptor) {
this.clientName = name;
this.isSession = isSession;
// Set in conf for local mode AM to figure out whether in session mode or not
tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, isSession);
this.amConfig = new AMConfiguration(tezConf, localResources, credentials);
this.apiVersionInfo = new TezApiVersionInfo();
+ this.servicePluginsDescriptor = servicePluginsDescriptor;
LOG.info("Tez Client Version: " + apiVersionInfo.toString());
}
+
+ /**
+ * Create a new TezClientBuilder. This can be used to setup additional parameters
+ * like session mode, local resources, credentials, servicePlugins, etc.
+ * <p/>
+ * If session mode is not specified in the builder, this will be inferred from
+ * the provided TezConfiguration.
+ *
+ * @param name Name of the client. Used for logging etc. This will also be used
+ * as app master name is session mode
+ * @param tezConf Configuration for the framework
+ * @return An instance of {@link org.apache.tez.client.TezClient.TezClientBuilder}
+ * which can be used to construct the final TezClient.
+ */
+ public static TezClientBuilder newBuilder(String name, TezConfiguration tezConf) {
+ return new TezClientBuilder(name, tezConf);
+ }
+
/**
* Create a new TezClient. Session or non-session execution mode will be
* inferred from configuration.
@@ -355,7 +383,7 @@ public class TezClient {
sessionAppId,
null, clientName, amConfig,
tezJarResources, sessionCredentials, usingTezArchiveDeploy, apiVersionInfo,
- historyACLPolicyManager);
+ historyACLPolicyManager, servicePluginsDescriptor);
// Set Tez Sessions to not retry on AM crashes if recovery is disabled
if (!amConfig.getTezConfiguration().getBoolean(
@@ -771,7 +799,8 @@ public class TezClient {
ApplicationSubmissionContext appContext = TezClientUtils
.createApplicationSubmissionContext(
appId, dag, dag.getName(), amConfig, tezJarResources, credentials,
- usingTezArchiveDeploy, apiVersionInfo, historyACLPolicyManager);
+ usingTezArchiveDeploy, apiVersionInfo, historyACLPolicyManager,
+ servicePluginsDescriptor);
LOG.info("Submitting DAG to YARN"
+ ", applicationId=" + appId
+ ", dagName=" + dag.getName());
@@ -846,4 +875,46 @@ public class TezClient {
append(SEPARATOR).
append(tezDagIdFormat.get().format(1)).toString();
}
+
+ @Public
+ public static class TezClientBuilder {
+ final String name;
+ final TezConfiguration tezConf;
+ boolean isSession;
+ private Map<String, LocalResource> localResourceMap;
+ private Credentials credentials;
+ ServicePluginsDescriptor servicePluginsDescriptor;
+
+ private TezClientBuilder(String name, TezConfiguration tezConf) {
+ this.name = name;
+ this.tezConf = tezConf;
+ isSession = tezConf.getBoolean(
+ TezConfiguration.TEZ_AM_SESSION_MODE, TezConfiguration.TEZ_AM_SESSION_MODE_DEFAULT);
+ }
+
+ public TezClientBuilder setIsSession(boolean isSession) {
+ this.isSession = isSession;
+ return this;
+ }
+
+ public TezClientBuilder setLocalResources(Map<String, LocalResource> localResources) {
+ this.localResourceMap = localResources;
+ return this;
+ }
+
+ public TezClientBuilder setCredentials(Credentials credentials) {
+ this.credentials = credentials;
+ return this;
+ }
+
+ public TezClientBuilder setServicePluginDescriptor(ServicePluginsDescriptor servicePluginsDescriptor) {
+ this.servicePluginsDescriptor = servicePluginsDescriptor;
+ return this;
+ }
+
+ public TezClient build() {
+ return new TezClient(name, tezConf, isSession, localResourceMap, credentials,
+ servicePluginsDescriptor);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/cfd625e9/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index 8bfaa1f..9cf1f3f 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -39,6 +39,10 @@ import java.util.Map.Entry;
import com.google.common.base.Strings;
import org.apache.commons.lang.StringUtils;
+import org.apache.tez.dag.api.NamedEntityDescriptor;
+import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
+import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto;
+import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -405,6 +409,7 @@ public class TezClientUtils {
* @param tezJarResources Resources to be used by the AM
* @param sessionCreds the credential object which will be populated with session specific
* @param historyACLPolicyManager
+ * @param servicePluginsDescriptor descriptor for services which may be running in the AM
* @return an ApplicationSubmissionContext to launch a Tez AM
* @throws IOException
* @throws YarnException
@@ -415,7 +420,8 @@ public class TezClientUtils {
ApplicationId appId, DAG dag, String amName,
AMConfiguration amConfig, Map<String, LocalResource> tezJarResources,
Credentials sessionCreds, boolean tezLrsAsArchive,
- TezApiVersionInfo apiVersionInfo, HistoryACLPolicyManager historyACLPolicyManager)
+ TezApiVersionInfo apiVersionInfo, HistoryACLPolicyManager historyACLPolicyManager,
+ ServicePluginsDescriptor servicePluginsDescriptor)
throws IOException, YarnException {
Preconditions.checkNotNull(sessionCreds);
@@ -551,7 +557,7 @@ public class TezClientUtils {
// emit conf as PB file
ConfigurationProto finalConfProto = createFinalConfProtoForApp(amConfig.getTezConfiguration(),
- aclConfigs);
+ aclConfigs, servicePluginsDescriptor);
FSDataOutputStream amConfPBOutBinaryStream = null;
try {
@@ -752,12 +758,8 @@ public class TezClientUtils {
+ "," + TezConstants.TEZ_CONTAINER_LOGGER_NAME);
}
- static ConfigurationProto createFinalConfProtoForApp(Configuration amConf) {
- return createFinalConfProtoForApp(amConf, null);
- }
-
static ConfigurationProto createFinalConfProtoForApp(Configuration amConf,
- Map<String, String> additionalConfigs) {
+ Map<String, String> additionalConfigs, ServicePluginsDescriptor servicePluginsDescriptor) {
assert amConf != null;
ConfigurationProto.Builder builder = ConfigurationProto.newBuilder();
for (Entry<String, String> entry : amConf) {
@@ -774,9 +776,49 @@ public class TezClientUtils {
builder.addConfKeyValues(kvp);
}
}
+
+ AMPluginDescriptorProto pluginDescriptorProto =
+ createAMServicePluginDescriptorProto(servicePluginsDescriptor);
+ builder.setAmPluginDescriptor(pluginDescriptorProto);
+
return builder.build();
}
+ static AMPluginDescriptorProto createAMServicePluginDescriptorProto(
+ ServicePluginsDescriptor servicePluginsDescriptor) {
+ AMPluginDescriptorProto.Builder pluginDescriptorBuilder =
+ AMPluginDescriptorProto.newBuilder();
+ if (servicePluginsDescriptor != null) {
+
+ pluginDescriptorBuilder.setContainersEnabled(servicePluginsDescriptor.areContainersEnabled());
+ pluginDescriptorBuilder.setUberEnabled(servicePluginsDescriptor.isUberEnabled());
+
+ if (servicePluginsDescriptor.getTaskSchedulerDescriptors() != null &&
+ servicePluginsDescriptor.getTaskSchedulerDescriptors().length > 0) {
+ List<TezNamedEntityDescriptorProto> namedEntityProtos = DagTypeConverters.convertNamedEntityCollectionToProto(
+ servicePluginsDescriptor.getTaskSchedulerDescriptors());
+ pluginDescriptorBuilder.addAllTaskScedulers(namedEntityProtos);
+ }
+
+ if (servicePluginsDescriptor.getContainerLauncherDescriptors() != null &&
+ servicePluginsDescriptor.getContainerLauncherDescriptors().length > 0) {
+ List<TezNamedEntityDescriptorProto> namedEntityProtos = DagTypeConverters.convertNamedEntityCollectionToProto(
+ servicePluginsDescriptor.getContainerLauncherDescriptors());
+ pluginDescriptorBuilder.addAllContainerLaunchers(namedEntityProtos);
+ }
+
+ if (servicePluginsDescriptor.getTaskCommunicatorDescriptors() != null &&
+ servicePluginsDescriptor.getTaskCommunicatorDescriptors().length > 0) {
+ List<TezNamedEntityDescriptorProto> namedEntityProtos = DagTypeConverters.convertNamedEntityCollectionToProto(
+ servicePluginsDescriptor.getTaskCommunicatorDescriptors());
+ pluginDescriptorBuilder.addAllTaskCommunicators(namedEntityProtos);
+ }
+
+ } else {
+ pluginDescriptorBuilder.setContainersEnabled(true).setUberEnabled(false);
+ }
+ return pluginDescriptorBuilder.build();
+ }
/**
* Helper function to create a YARN LocalResource
http://git-wip-us.apache.org/repos/asf/tez/blob/cfd625e9/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index 8ee1682..fce9522 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -32,6 +32,9 @@ import java.util.Stack;
import org.apache.commons.collections4.BidiMap;
import org.apache.commons.collections4.bidimap.DualLinkedHashBidiMap;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.tez.dag.api.Vertex.VertexExecutionContext;
+import org.apache.tez.dag.api.records.DAGProtos;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -60,9 +63,7 @@ import org.apache.tez.dag.api.records.DAGProtos.PlanVertexType;
import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Collections2;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -92,6 +93,7 @@ public class DAG {
Map<String, LocalResource> commonTaskLocalFiles = Maps.newHashMap();
String dagInfo;
private Map<String,String> dagConf = new HashMap<String, String>();
+ private VertexExecutionContext defaultExecutionContext;
private Stack<String> topologicalVertexStack = new Stack<String>();
@@ -335,6 +337,26 @@ public class DAG {
return this;
}
+ /**
+ * Sets the default execution context for the DAG. This can be overridden at a per Vertex level.
+ * See {@link org.apache.tez.dag.api.Vertex#setExecutionContext(VertexExecutionContext)}
+ *
+ * @param vertexExecutionContext the default execution context for the DAG
+ *
+ * @return
+ */
+ @Public
+ @InterfaceStability.Unstable
+ public synchronized DAG setExecutionContext(VertexExecutionContext vertexExecutionContext) {
+ this.defaultExecutionContext = vertexExecutionContext;
+ return this;
+ }
+
+ @Private
+ VertexExecutionContext getDefaultExecutionContext() {
+ return this.defaultExecutionContext;
+ }
+
@Private
@VisibleForTesting
public Map<String,String> getDagConf() {
@@ -707,7 +729,15 @@ public class DAG {
if (this.dagInfo != null && !this.dagInfo.isEmpty()) {
dagBuilder.setDagInfo(this.dagInfo);
}
-
+
+ // Setup default execution context.
+ VertexExecutionContext defaultContext = getDefaultExecutionContext();
+ if (defaultContext != null) {
+ DAGProtos.VertexExecutionContextProto contextProto = DagTypeConverters.convertToProto(
+ defaultContext);
+ dagBuilder.setDefaultExecutionContext(contextProto);
+ }
+
if (!vertexGroups.isEmpty()) {
for (VertexGroup av : vertexGroups) {
GroupInfo groupInfo = av.getGroupInfo();
@@ -800,7 +830,17 @@ public class DAG {
vertexBuilder.setName(vertex.getName());
vertexBuilder.setType(PlanVertexType.NORMAL); // vertex type is implicitly NORMAL until TEZ-46.
vertexBuilder.setProcessorDescriptor(DagTypeConverters
- .convertToDAGPlan(vertex.getProcessorDescriptor()));
+ .convertToDAGPlan(vertex.getProcessorDescriptor()));
+
+ // Vertex ExecutionContext setup
+ VertexExecutionContext execContext = vertex.getVertexExecutionContext();
+ if (execContext != null) {
+ DAGProtos.VertexExecutionContextProto contextProto =
+ DagTypeConverters.convertToProto(execContext);
+ vertexBuilder.setExecutionContext(contextProto);
+ }
+ // End of VertexExecutionContext setup.
+
if (vertex.getInputs().size() > 0) {
for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input : vertex.getInputs()) {
vertexBuilder.addInputs(DagTypeConverters.convertToDAGPlan(input));
http://git-wip-us.apache.org/repos/asf/tez/blob/cfd625e9/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
index 8b1d553..2e0d417 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
@@ -73,11 +73,15 @@ import org.apache.tez.dag.api.records.DAGProtos.TezCounterGroupProto;
import org.apache.tez.dag.api.records.DAGProtos.TezCounterProto;
import org.apache.tez.dag.api.records.DAGProtos.TezCountersProto;
import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
+import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto;
import org.apache.tez.dag.api.records.DAGProtos.VertexLocationHintProto;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import com.google.protobuf.ByteString.Output;
+import org.apache.tez.serviceplugins.api.ContainerLauncherDescriptor;
+import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor;
+import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor;
@Private
public class DagTypeConverters {
@@ -399,6 +403,8 @@ public class DagTypeConverters {
return userPayload;
}
+
+
private static void setUserPayload(EntityDescriptor<?> entity, UserPayload payload) {
if (payload != null) {
entity.setUserPayload(payload);
@@ -423,6 +429,15 @@ public class DagTypeConverters {
return od;
}
+ public static NamedEntityDescriptor convertNamedDescriptorFromProto(TezNamedEntityDescriptorProto proto) {
+ String name = proto.getName();
+ String className = proto.getEntityDescriptor().getClassName();
+ UserPayload payload = convertTezUserPayloadFromDAGPlan(proto.getEntityDescriptor());
+ NamedEntityDescriptor descriptor = new NamedEntityDescriptor(name, className);
+ setUserPayload(descriptor, payload);
+ return descriptor;
+ }
+
public static InputInitializerDescriptor convertInputInitializerDescriptorFromDAGPlan(
TezEntityDescriptorProto proto) {
String className = proto.getClassName();
@@ -550,11 +565,11 @@ public class DagTypeConverters {
public static LocalResource convertPlanLocalResourceToLocalResource(
PlanLocalResource plr) {
return LocalResource.newInstance(
- ConverterUtils.getYarnUrlFromPath(new Path(plr.getUri())),
- DagTypeConverters.convertFromDAGPlan(plr.getType()),
- DagTypeConverters.convertFromDAGPlan(plr.getVisibility()),
- plr.getSize(), plr.getTimeStamp(),
- plr.hasPattern() ? plr.getPattern() : null);
+ ConverterUtils.getYarnUrlFromPath(new Path(plr.getUri())),
+ DagTypeConverters.convertFromDAGPlan(plr.getType()),
+ DagTypeConverters.convertFromDAGPlan(plr.getVisibility()),
+ plr.getSize(), plr.getTimeStamp(),
+ plr.hasPattern() ? plr.getPattern() : null);
}
public static TezCounters convertTezCountersFromProto(TezCountersProto proto) {
@@ -717,4 +732,72 @@ public class DagTypeConverters {
return payload.getPayload();
}
+ public static DAGProtos.VertexExecutionContextProto convertToProto(
+ Vertex.VertexExecutionContext context) {
+ if (context == null) {
+ return null;
+ } else {
+ DAGProtos.VertexExecutionContextProto.Builder builder =
+ DAGProtos.VertexExecutionContextProto.newBuilder();
+ builder.setExecuteInAm(context.shouldExecuteInAm());
+ builder.setExecuteInContainers(context.shouldExecuteInContainers());
+ if (context.getTaskSchedulerName() != null) {
+ builder.setTaskSchedulerName(context.getTaskSchedulerName());
+ }
+ if (context.getContainerLauncherName() != null) {
+ builder.setContainerLauncherName(context.getContainerLauncherName());
+ }
+ if (context.getTaskCommName() != null) {
+ builder.setTaskCommName(context.getTaskCommName());
+ }
+ return builder.build();
+ }
+ }
+
+ public static Vertex.VertexExecutionContext convertFromProto(
+ DAGProtos.VertexExecutionContextProto proto) {
+ if (proto == null) {
+ return null;
+ } else {
+ if (proto.getExecuteInAm()) {
+ Vertex.VertexExecutionContext context =
+ Vertex.VertexExecutionContext.createExecuteInAm(proto.getExecuteInAm());
+ return context;
+ } else if (proto.getExecuteInContainers()) {
+ Vertex.VertexExecutionContext context =
+ Vertex.VertexExecutionContext.createExecuteInContainers(proto.getExecuteInContainers());
+ return context;
+ } else {
+ String taskScheduler = proto.hasTaskSchedulerName() ? proto.getTaskSchedulerName() : null;
+ String containerLauncher =
+ proto.hasContainerLauncherName() ? proto.getContainerLauncherName() : null;
+ String taskComm = proto.hasTaskCommName() ? proto.getTaskCommName() : null;
+ Vertex.VertexExecutionContext context =
+ Vertex.VertexExecutionContext.create(taskScheduler, containerLauncher, taskComm);
+ return context;
+ }
+ }
+ }
+
+ public static List<TezNamedEntityDescriptorProto> convertNamedEntityCollectionToProto(
+ NamedEntityDescriptor[] namedEntityDescriptors) {
+ List<TezNamedEntityDescriptorProto> list =
+ Lists.newArrayListWithCapacity(namedEntityDescriptors.length);
+ for (NamedEntityDescriptor namedEntity : namedEntityDescriptors) {
+ TezNamedEntityDescriptorProto namedEntityProto = convertNamedEntityToProto(namedEntity);
+ list.add(namedEntityProto);
+ }
+ return list;
+ }
+
+ public static TezNamedEntityDescriptorProto convertNamedEntityToProto(
+ NamedEntityDescriptor namedEntityDescriptor) {
+ TezNamedEntityDescriptorProto.Builder builder = TezNamedEntityDescriptorProto.newBuilder();
+ builder.setName(namedEntityDescriptor.getEntityName());
+ DAGProtos.TezEntityDescriptorProto entityProto =
+ DagTypeConverters.convertToDAGPlan(namedEntityDescriptor);
+ builder.setEntityDescriptor(entityProto);
+ return builder.build();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/cfd625e9/tez-api/src/main/java/org/apache/tez/dag/api/NamedEntityDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/NamedEntityDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/NamedEntityDescriptor.java
new file mode 100644
index 0000000..bad0d10
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/NamedEntityDescriptor.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+public class NamedEntityDescriptor<T extends EntityDescriptor<T>> extends EntityDescriptor<T> {
+ private final String entityName;
+
+ @InterfaceAudience.Private
+ public NamedEntityDescriptor(String entityName, String className) {
+ super(className);
+ Preconditions.checkArgument(entityName != null, "EntityName must be specified");
+ this.entityName = entityName;
+ }
+
+ public String getEntityName() {
+ return entityName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/cfd625e9/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 39a4c77..3b7378a 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -1215,37 +1215,6 @@ public class TezConfiguration extends Configuration {
+ "tez-ui.webservice.enable";
public static final boolean TEZ_AM_WEBSERVICE_ENABLE_DEFAULT = true;
- /** defaults container-launcher for the specific vertex */
- @ConfigurationScope(Scope.VERTEX)
- public static final String TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME = TEZ_AM_PREFIX + "vertex.container-launcher.name";
- /** defaults task-scheduler for the specific vertex */
- @ConfigurationScope(Scope.VERTEX)
- public static final String TEZ_AM_VERTEX_TASK_SCHEDULER_NAME = TEZ_AM_PREFIX + "vertex.task-scheduler.name";
- /** defaults task-communicator for the specific vertex */
- @ConfigurationScope(Scope.VERTEX)
- public static final String TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME = TEZ_AM_PREFIX + "vertex.task-communicator.name";
-
- /** Comma separated list of named container-launcher classes running in the AM.
- * The format for each entry is NAME:CLASSNAME, except for tez default which is specified as Tez
- * e.g. Tez, ExtService:org.apache.ExtLauncherClasss
- * */
- @ConfigurationScope(Scope.AM)
- public static final String TEZ_AM_CONTAINER_LAUNCHERS = TEZ_AM_PREFIX + "container-launchers";
-
- /** Comma separated list of task-schedulers classes running in the AM.
- * The format for each entry is NAME:CLASSNAME, except for tez default which is specified as Tez
- * e.g. Tez, ExtService:org.apache.ExtSchedulerClasss
- */
- @ConfigurationScope(Scope.AM)
- public static final String TEZ_AM_TASK_SCHEDULERS = TEZ_AM_PREFIX + "task-schedulers";
-
- /** Comma separated list of task-communicators classes running in the AM.
- * The format for each entry is NAME:CLASSNAME, except for tez default which is specified as Tez
- * e.g. Tez, ExtService:org.apache.ExtTaskCommClass
- * */
- @ConfigurationScope(Scope.AM)
- public static final String TEZ_AM_TASK_COMMUNICATORS = TEZ_AM_PREFIX + "task-communicators";
-
// TODO only validate property here, value can also be validated if necessary
public static void validateProperty(String property, Scope usedScope) {
Scope validScope = PropertyScope.get(property);
http://git-wip-us.apache.org/repos/asf/tez/blob/cfd625e9/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
index 3b07c59..6e1cb2d 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
@@ -102,7 +102,14 @@ public class TezConstants {
/// Version-related Environment variables
public static final String TEZ_CLIENT_VERSION_ENV = "TEZ_CLIENT_VERSION";
+ private static final String TEZ_AM_SERVICE_PLUGIN_NAME_YARN_CONTAINERS = "TezYarn";
+ private static final String TEZ_AM_SERVICE_PLUGIN_NAME_IN_AM = "TezUber";
- public static final String TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT = "Tez";
- public static final String TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT = "TezLocal";
+ public static String getTezYarnServicePluginName() {
+ return TEZ_AM_SERVICE_PLUGIN_NAME_YARN_CONTAINERS;
+ }
+
+ public static String getTezUberServicePluginName() {
+ return TEZ_AM_SERVICE_PLUGIN_NAME_IN_AM;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/cfd625e9/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
index 0ed4bd8..34124b2 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
@@ -28,11 +28,11 @@ import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.dag.api.VertexGroup.GroupInfo;
-import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.runtime.api.LogicalIOProcessor;
import com.google.common.base.Preconditions;
@@ -57,6 +57,7 @@ public class Vertex {
private final Map<String, LocalResource> taskLocalResources = new HashMap<String, LocalResource>();
private Map<String, String> taskEnvironment = new HashMap<String, String>();
private Map<String, String> vertexConf = new HashMap<String, String>();
+ private VertexExecutionContext vertexExecutionContext;
private final Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> additionalInputs
= new HashMap<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>();
private final Map<String, RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>> additionalOutputs
@@ -410,6 +411,108 @@ public class Vertex {
return this;
}
+ /**
+ * Sets the execution context for this Vertex - i.e. the Task Scheduler, ContainerLauncher and
+ * TaskCommunicator to be used. Also whether the vertex will be executed within the AM.
+ * If partially specified, the default components in Tez will be used - which may or may not work
+ * with the custom context.
+ *
+ * @param vertexExecutionContext the execution context for the vertex.
+ *
+ * @return
+ */
+ public Vertex setExecutionContext(VertexExecutionContext vertexExecutionContext) {
+ this.vertexExecutionContext = vertexExecutionContext;
+ return this;
+ }
+
+ @Public
+ @InterfaceStability.Unstable
+ public static class VertexExecutionContext {
+ final boolean executeInAm;
+ final boolean executeInContainers;
+ final String taskSchedulerName;
+ final String containerLauncherName;
+ final String taskCommName;
+
+ public static VertexExecutionContext createExecuteInAm(boolean executeInAm) {
+ return new VertexExecutionContext(executeInAm, false);
+ }
+
+ public static VertexExecutionContext createExecuteInContainers(boolean executeInContainers) {
+ return new VertexExecutionContext(false, executeInContainers);
+ }
+
+ public static VertexExecutionContext create(String taskSchedulerName, String containerLauncherName,
+ String taskCommName) {
+ return new VertexExecutionContext(taskSchedulerName, containerLauncherName, taskCommName);
+ }
+
+ private VertexExecutionContext(boolean executeInAm, boolean executeInContainers) {
+ this(executeInAm, executeInContainers, null, null, null);
+ }
+
+ private VertexExecutionContext(String taskSchedulerName, String containerLauncherName,
+ String taskCommName) {
+ this(false, false, taskSchedulerName, containerLauncherName, taskCommName);
+ }
+
+ private VertexExecutionContext(boolean executeInAm, boolean executeInContainers, String taskSchedulerName, String containerLauncherName,
+ String taskCommName) {
+ if (executeInAm || executeInContainers) {
+ Preconditions.checkState(!(executeInAm && executeInContainers),
+ "executeInContainers and executeInAM are mutually exclusive");
+ Preconditions.checkState(
+ taskSchedulerName == null && containerLauncherName == null && taskCommName == null,
+ "Uber (in-AM) or container execution cannot be enabled with a custom plugins. TaskScheduler=" +
+ taskSchedulerName + ", ContainerLauncher=" + containerLauncherName +
+ ", TaskCommunicator=" + taskCommName);
+ }
+ if (taskSchedulerName != null || containerLauncherName != null || taskCommName != null) {
+ Preconditions.checkState(executeInAm == false && executeInContainers == false,
+ "Uber (in-AM) and container execution cannot be enabled with a custom plugins. TaskScheduler=" +
+ taskSchedulerName + ", ContainerLauncher=" + containerLauncherName +
+ ", TaskCommunicator=" + taskCommName);
+ }
+ this.executeInAm = executeInAm;
+ this.executeInContainers = executeInContainers;
+ this.taskSchedulerName = taskSchedulerName;
+ this.containerLauncherName = containerLauncherName;
+ this.taskCommName = taskCommName;
+ }
+
+ public boolean shouldExecuteInAm() {
+ return executeInAm;
+ }
+
+ public boolean shouldExecuteInContainers() {
+ return executeInContainers;
+ }
+
+ public String getTaskSchedulerName() {
+ return taskSchedulerName;
+ }
+
+ public String getContainerLauncherName() {
+ return containerLauncherName;
+ }
+
+ public String getTaskCommName() {
+ return taskCommName;
+ }
+
+ @Override
+ public String toString() {
+ return "VertexExecutionContext{" +
+ "executeInAm=" + executeInAm +
+ ", executeInContainers=" + executeInContainers +
+ ", taskSchedulerName='" + taskSchedulerName + '\'' +
+ ", containerLauncherName='" + containerLauncherName + '\'' +
+ ", taskCommName='" + taskCommName + '\'' +
+ '}';
+ }
+ }
+
@Override
public String toString() {
return "[" + vertexName + " : " + processorDescriptor.getClassName() + "]";
@@ -475,6 +578,11 @@ public class Vertex {
return dataSinks;
}
+ @Private
+ public VertexExecutionContext getVertexExecutionContext() {
+ return this.vertexExecutionContext;
+ }
+
List<Edge> getInputEdges() {
return inputEdges;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/cfd625e9/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherDescriptor.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherDescriptor.java
new file mode 100644
index 0000000..ff3c90e
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherDescriptor.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.serviceplugins.api;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.tez.dag.api.NamedEntityDescriptor;
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class ContainerLauncherDescriptor extends NamedEntityDescriptor<ContainerLauncherDescriptor> {
+
+ private ContainerLauncherDescriptor(String containerLauncherName, String containerLauncherClassname) {
+ super(containerLauncherName, containerLauncherClassname);
+ }
+
+ public static ContainerLauncherDescriptor create(String containerLauncherName, String containerLauncherClassname) {
+ return new ContainerLauncherDescriptor(containerLauncherName, containerLauncherClassname);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/cfd625e9/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java
new file mode 100644
index 0000000..8df102a
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.serviceplugins.api;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class ServicePluginsDescriptor {
+
+ private final boolean enableContainers;
+ private final boolean enableUber;
+
+ private TaskSchedulerDescriptor[] taskSchedulerDescriptors;
+ private ContainerLauncherDescriptor[] containerLauncherDescriptors;
+ private TaskCommunicatorDescriptor[] taskCommunicatorDescriptors;
+
+ private ServicePluginsDescriptor(boolean enableContainers, boolean enableUber,
+ TaskSchedulerDescriptor[] taskSchedulerDescriptor,
+ ContainerLauncherDescriptor[] containerLauncherDescriptors,
+ TaskCommunicatorDescriptor[] taskCommunicatorDescriptors) {
+ this.enableContainers = enableContainers;
+ this.enableUber = enableUber;
+ Preconditions.checkArgument(taskSchedulerDescriptors == null || taskSchedulerDescriptors.length > 0,
+ "TaskSchedulerDescriptors should either not be specified or at least 1 should be provided");
+ this.taskSchedulerDescriptors = taskSchedulerDescriptor;
+ Preconditions.checkArgument(containerLauncherDescriptors == null || containerLauncherDescriptors.length > 0,
+ "ContainerLauncherDescriptor should either not be specified or at least 1 should be provided");
+ this.containerLauncherDescriptors = containerLauncherDescriptors;
+ Preconditions.checkArgument(taskCommunicatorDescriptors == null || taskCommunicatorDescriptors.length > 0,
+ "TaskCommunicatorDescriptors should either not be specified or at least 1 should be provided");
+ this.taskCommunicatorDescriptors = taskCommunicatorDescriptors;
+ }
+
+ public static ServicePluginsDescriptor create(TaskSchedulerDescriptor[] taskSchedulerDescriptor,
+ ContainerLauncherDescriptor[] containerLauncherDescriptors,
+ TaskCommunicatorDescriptor[] taskCommunicatorDescriptors) {
+ return new ServicePluginsDescriptor(true, false, taskSchedulerDescriptor,
+ containerLauncherDescriptors, taskCommunicatorDescriptors);
+ }
+
+ public static ServicePluginsDescriptor create(boolean enableUber,
+ TaskSchedulerDescriptor[] taskSchedulerDescriptor,
+ ContainerLauncherDescriptor[] containerLauncherDescriptors,
+ TaskCommunicatorDescriptor[] taskCommunicatorDescriptors) {
+ return new ServicePluginsDescriptor(true, enableUber, taskSchedulerDescriptor,
+ containerLauncherDescriptors, taskCommunicatorDescriptors);
+ }
+
+ public static ServicePluginsDescriptor create(boolean enableContainers, boolean enableUber,
+ TaskSchedulerDescriptor[] taskSchedulerDescriptor,
+ ContainerLauncherDescriptor[] containerLauncherDescriptors,
+ TaskCommunicatorDescriptor[] taskCommunicatorDescriptors) {
+ return new ServicePluginsDescriptor(enableContainers, enableUber, taskSchedulerDescriptor,
+ containerLauncherDescriptors, taskCommunicatorDescriptors);
+ }
+
+ public static ServicePluginsDescriptor create(boolean enableUber) {
+ return new ServicePluginsDescriptor(true, enableUber, null, null, null);
+ }
+
+
+ public boolean areContainersEnabled() {
+ return enableContainers;
+ }
+
+ public boolean isUberEnabled() {
+ return enableUber;
+ }
+
+ public TaskSchedulerDescriptor[] getTaskSchedulerDescriptors() {
+ return taskSchedulerDescriptors;
+ }
+
+ public ContainerLauncherDescriptor[] getContainerLauncherDescriptors() {
+ return containerLauncherDescriptors;
+ }
+
+ public TaskCommunicatorDescriptor[] getTaskCommunicatorDescriptors() {
+ return taskCommunicatorDescriptors;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/cfd625e9/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorDescriptor.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorDescriptor.java
new file mode 100644
index 0000000..57ac385
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorDescriptor.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.serviceplugins.api;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.tez.dag.api.NamedEntityDescriptor;
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class TaskCommunicatorDescriptor extends NamedEntityDescriptor<TaskCommunicatorDescriptor> {
+
+
+ private TaskCommunicatorDescriptor(String taskCommName, String taskCommClassname) {
+ super(taskCommName, taskCommClassname);
+ }
+
+ public static TaskCommunicatorDescriptor create(String taskCommName, String taskCommClassname) {
+ return new TaskCommunicatorDescriptor(taskCommName, taskCommClassname);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/cfd625e9/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerDescriptor.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerDescriptor.java
new file mode 100644
index 0000000..12e0919
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerDescriptor.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.serviceplugins.api;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.tez.dag.api.NamedEntityDescriptor;
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class TaskSchedulerDescriptor extends NamedEntityDescriptor<TaskSchedulerDescriptor> {
+
+ private TaskSchedulerDescriptor(String taskSchedulerName, String schedulerClassname) {
+ super(taskSchedulerName, schedulerClassname);
+ }
+
+ public static TaskSchedulerDescriptor create(String taskSchedulerName, String schedulerClassName) {
+ return new TaskSchedulerDescriptor(taskSchedulerName, schedulerClassName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/cfd625e9/tez-api/src/main/proto/DAGApiRecords.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/DAGApiRecords.proto b/tez-api/src/main/proto/DAGApiRecords.proto
index 959d4e6..ebe3259 100644
--- a/tez-api/src/main/proto/DAGApiRecords.proto
+++ b/tez-api/src/main/proto/DAGApiRecords.proto
@@ -127,6 +127,14 @@ message RootInputLeafOutputProto {
optional TezEntityDescriptorProto controller_descriptor = 3;
}
+message VertexExecutionContextProto {
+ optional bool execute_in_am = 1;
+ optional bool execute_in_containers = 2;
+ optional string task_scheduler_name = 3;
+ optional string container_launcher_name = 4;
+ optional string task_comm_name = 5;
+}
+
message VertexPlan {
required string name = 1;
required PlanVertexType type = 2;
@@ -139,6 +147,7 @@ message VertexPlan {
repeated RootInputLeafOutputProto outputs = 9;
optional TezEntityDescriptorProto vertex_manager_plugin = 10;
optional ConfigurationProto vertexConf = 11;
+ optional VertexExecutionContextProto execution_context = 12;
}
message PlanEdgeProperty {
@@ -162,8 +171,23 @@ message EdgePlan {
optional TezEntityDescriptorProto edge_manager = 9;
}
+message TezNamedEntityDescriptorProto {
+ optional string name = 1;
+ optional TezEntityDescriptorProto entity_descriptor = 2;
+}
+
+
+message AMPluginDescriptorProto {
+ optional bool containers_enabled = 1 [default = true];
+ optional bool uber_enabled = 2 [default = false];
+ repeated TezNamedEntityDescriptorProto task_scedulers = 3;
+ repeated TezNamedEntityDescriptorProto container_launchers = 4;
+ repeated TezNamedEntityDescriptorProto task_communicators = 5;
+}
+
message ConfigurationProto {
repeated PlanKeyValuePair confKeyValues = 1;
+ optional AMPluginDescriptorProto am_plugin_descriptor = 2;
}
message DAGPlan {
@@ -175,6 +199,7 @@ message DAGPlan {
repeated PlanVertexGroupInfo vertex_groups = 6;
repeated PlanLocalResource local_resource = 7;
optional string dag_info = 8;
+ optional VertexExecutionContextProto default_execution_context = 9;
}
// DAG monitoring messages
http://git-wip-us.apache.org/repos/asf/tez/blob/cfd625e9/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java
index 2d4e005..8946ef0 100644
--- a/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java
+++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java
@@ -261,7 +261,7 @@ public class TestTezClientUtils {
ApplicationSubmissionContext appSubmissionContext =
TezClientUtils.createApplicationSubmissionContext(appId, dag, "amName", amConf,
new HashMap<String, LocalResource>(), credentials, false, new TezApiVersionInfo(),
- mock(HistoryACLPolicyManager.class));
+ mock(HistoryACLPolicyManager.class), null);
ContainerLaunchContext amClc = appSubmissionContext.getAMContainerSpec();
Map<String, ByteBuffer> amServiceData = amClc.getServiceData();
@@ -294,7 +294,7 @@ public class TestTezClientUtils {
ApplicationSubmissionContext appSubmissionContext =
TezClientUtils.createApplicationSubmissionContext(appId, dag, "amName", amConf,
new HashMap<String, LocalResource>(), credentials, false, new TezApiVersionInfo(),
- mock(HistoryACLPolicyManager.class));
+ mock(HistoryACLPolicyManager.class), null);
List<String> expectedCommands = new LinkedList<String>();
expectedCommands.add("-Dlog4j.configuratorClass=org.apache.tez.common.TezLog4jConfigurator");
@@ -334,7 +334,7 @@ public class TestTezClientUtils {
ApplicationSubmissionContext appSubmissionContext =
TezClientUtils.createApplicationSubmissionContext(appId, dag, "amName", amConf,
new HashMap<String, LocalResource>(), credentials, false, new TezApiVersionInfo(),
- mock(HistoryACLPolicyManager.class));
+ mock(HistoryACLPolicyManager.class), null);
List<String> expectedCommands = new LinkedList<String>();
expectedCommands.add("-Dlog4j.configuratorClass=org.apache.tez.common.TezLog4jConfigurator");
@@ -516,7 +516,7 @@ public class TestTezClientUtils {
expected.put("property1", val1);
expected.put("property2", expVal2);
- ConfigurationProto confProto = TezClientUtils.createFinalConfProtoForApp(conf, null);
+ ConfigurationProto confProto = TezClientUtils.createFinalConfProtoForApp(conf, null, null);
for (PlanKeyValuePair kvPair : confProto.getConfKeyValuesList()) {
String v = expected.remove(kvPair.getKey());
@@ -620,7 +620,7 @@ public class TestTezClientUtils {
srcConf.set(entry.getKey(), entry.getValue());
}
- ConfigurationProto confProto = TezClientUtils.createFinalConfProtoForApp(srcConf);
+ ConfigurationProto confProto = TezClientUtils.createFinalConfProtoForApp(srcConf, null, null);
for (PlanKeyValuePair kvPair : confProto.getConfKeyValuesList()) {
String val = confMap.remove(kvPair.getKey());
@@ -677,4 +677,6 @@ public class TestTezClientUtils {
Assert.assertTrue(resourceNames.contains("dir2-f.txt"));
}
+ // TODO TEZ-2003 Add test to validate ServicePluginDescriptor propagation
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/cfd625e9/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
index 532e83c..1fb7ff9 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
@@ -57,7 +57,7 @@ public class TezUtilsInternal {
private static final Logger LOG = LoggerFactory.getLogger(TezUtilsInternal.class);
- public static void addUserSpecifiedTezConfiguration(String baseDir, Configuration conf) throws
+ public static ConfigurationProto readUserSpecifiedTezConfiguration(String baseDir) throws
IOException {
FileInputStream confPBBinaryStream = null;
ConfigurationProto.Builder confProtoBuilder = ConfigurationProto.newBuilder();
@@ -72,14 +72,41 @@ public class TezUtilsInternal {
}
ConfigurationProto confProto = confProtoBuilder.build();
+ return confProto;
+ }
- List<PlanKeyValuePair> kvPairList = confProto.getConfKeyValuesList();
+ public static void addUserSpecifiedTezConfiguration(Configuration conf,
+ List<PlanKeyValuePair> kvPairList) {
if (kvPairList != null && !kvPairList.isEmpty()) {
for (PlanKeyValuePair kvPair : kvPairList) {
conf.set(kvPair.getKey(), kvPair.getValue());
}
}
}
+//
+// public static void addUserSpecifiedTezConfiguration(String baseDir, Configuration conf) throws
+// IOException {
+// FileInputStream confPBBinaryStream = null;
+// ConfigurationProto.Builder confProtoBuilder = ConfigurationProto.newBuilder();
+// try {
+// confPBBinaryStream =
+// new FileInputStream(new File(baseDir, TezConstants.TEZ_PB_BINARY_CONF_NAME));
+// confProtoBuilder.mergeFrom(confPBBinaryStream);
+// } finally {
+// if (confPBBinaryStream != null) {
+// confPBBinaryStream.close();
+// }
+// }
+//
+// ConfigurationProto confProto = confProtoBuilder.build();
+//
+// List<PlanKeyValuePair> kvPairList = confProto.getConfKeyValuesList();
+// if (kvPairList != null && !kvPairList.isEmpty()) {
+// for (PlanKeyValuePair kvPair : kvPairList) {
+// conf.set(kvPair.getKey(), kvPair.getValue());
+// }
+// }
+// }
public static byte[] compressBytes(byte[] inBytes) throws IOException {
http://git-wip-us.apache.org/repos/asf/tez/blob/cfd625e9/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
index 1bb2002..508f817 100644
--- a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
+++ b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
@@ -341,7 +341,7 @@ public class LocalClient extends FrameworkClient {
String[] localDirs, String[] logDirs, Credentials credentials, String jobUserName) {
return new DAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort,
new SystemClock(), appSubmitTime, isSession, userDir, localDirs, logDirs,
- versionInfo.getVersion(), 1, credentials, jobUserName);
+ versionInfo.getVersion(), 1, credentials, jobUserName, null);
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/cfd625e9/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index f3914d8..8388cfb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -40,6 +40,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -61,7 +62,11 @@ import com.google.common.collect.HashBiMap;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.Options;
+import org.apache.tez.dag.api.NamedEntityDescriptor;
import org.apache.tez.dag.api.SessionNotRunning;
+import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
+import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
+import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDagCleanup;
import org.apache.tez.dag.history.events.DAGRecoveredEvent;
import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -221,6 +226,7 @@ public class DAGAppMaster extends AbstractService {
private final String workingDirectory;
private final String[] localDirs;
private final String[] logDirs;
+ private final AMPluginDescriptorProto amPluginDescriptorProto;
private ContainerSignatureMatcher containerSignatureMatcher;
private AMContainerMap containers;
private AMNodeTracker nodes;
@@ -312,7 +318,7 @@ public class DAGAppMaster extends AbstractService {
ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
Clock clock, long appSubmitTime, boolean isSession, String workingDirectory,
String [] localDirs, String[] logDirs, String clientVersion, int maxAppAttempts,
- Credentials credentials, String jobUserName) {
+ Credentials credentials, String jobUserName, AMPluginDescriptorProto pluginDescriptorProto) {
super(DAGAppMaster.class.getName());
this.clock = clock;
this.startTime = clock.getTime();
@@ -332,6 +338,7 @@ public class DAGAppMaster extends AbstractService {
this.clientVersion = clientVersion;
this.maxAppAttempts = maxAppAttempts;
this.amCredentials = credentials;
+ this.amPluginDescriptorProto = pluginDescriptorProto;
this.appMasterUgi = UserGroupInformation
.createRemoteUser(jobUserName);
this.appMasterUgi.addCredentials(amCredentials);
@@ -380,28 +387,47 @@ public class DAGAppMaster extends AbstractService {
this.isLocal = conf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE,
TezConfiguration.TEZ_LOCAL_MODE_DEFAULT);
- String tezDefaultClassIdentifier =
- isLocal ? TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT :
- TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT;
+ List<NamedEntityDescriptor> taskSchedulerDescriptors;
+ List<NamedEntityDescriptor> containerLauncherDescriptors;
+ List<NamedEntityDescriptor> taskCommunicatorDescriptors;
+ boolean tezYarnEnabled = true;
+ boolean uberEnabled = false;
- String[] taskSchedulerClassIdentifiers = parsePlugins(taskSchedulers,
- conf.getTrimmedStrings(TezConfiguration.TEZ_AM_TASK_SCHEDULERS,
- tezDefaultClassIdentifier),
- TezConfiguration.TEZ_AM_TASK_SCHEDULERS);
+ if (!isLocal) {
+ if (amPluginDescriptorProto == null) {
+ tezYarnEnabled = true;
+ uberEnabled = false;
+ } else {
+ tezYarnEnabled = amPluginDescriptorProto.getContainersEnabled();
+ uberEnabled = amPluginDescriptorProto.getUberEnabled();
+ }
+ } else {
+ tezYarnEnabled = false;
+ uberEnabled = true;
+ }
+
+ taskSchedulerDescriptors = parsePlugin(taskSchedulers,
+ (amPluginDescriptorProto == null || amPluginDescriptorProto.getTaskScedulersCount() == 0 ?
+ null :
+ amPluginDescriptorProto.getTaskScedulersList()),
+ tezYarnEnabled, uberEnabled);
- String[] containerLauncherClassIdentifiers = parsePlugins(containerLaunchers,
- conf.getTrimmedStrings(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHERS,
- tezDefaultClassIdentifier),
- TezConfiguration.TEZ_AM_CONTAINER_LAUNCHERS);
+ containerLauncherDescriptors = parsePlugin(containerLaunchers,
+ (amPluginDescriptorProto == null ||
+ amPluginDescriptorProto.getContainerLaunchersCount() == 0 ? null :
+ amPluginDescriptorProto.getContainerLaunchersList()),
+ tezYarnEnabled, uberEnabled);
- String[] taskCommunicatorClassIdentifiers = parsePlugins(taskCommunicators,
- conf.getTrimmedStrings(TezConfiguration.TEZ_AM_TASK_COMMUNICATORS,
- tezDefaultClassIdentifier),
- TezConfiguration.TEZ_AM_TASK_COMMUNICATORS);
+ taskCommunicatorDescriptors = parsePlugin(taskCommunicators,
+ (amPluginDescriptorProto == null ||
+ amPluginDescriptorProto.getTaskCommunicatorsCount() == 0 ? null :
+ amPluginDescriptorProto.getTaskCommunicatorsList()),
+ tezYarnEnabled, uberEnabled);
- LOG.info(buildPluginComponentLog(taskSchedulerClassIdentifiers, taskSchedulers, "TaskSchedulers"));
- LOG.info(buildPluginComponentLog(containerLauncherClassIdentifiers, containerLaunchers, "ContainerLaunchers"));
- LOG.info(buildPluginComponentLog(taskCommunicatorClassIdentifiers, taskCommunicators, "TaskCommunicators"));
+
+ LOG.info(buildPluginComponentLog(taskSchedulerDescriptors, taskSchedulers, "TaskSchedulers"));
+ LOG.info(buildPluginComponentLog(containerLauncherDescriptors, containerLaunchers, "ContainerLaunchers"));
+ LOG.info(buildPluginComponentLog(taskCommunicatorDescriptors, taskCommunicators, "TaskCommunicators"));
boolean disableVersionCheck = conf.getBoolean(
TezConfiguration.TEZ_AM_DISABLE_CLIENT_VERSION_CHECK,
@@ -468,7 +494,7 @@ public class DAGAppMaster extends AbstractService {
//service to handle requests to TaskUmbilicalProtocol
taskAttemptListener = createTaskAttemptListener(context,
- taskHeartbeatHandler, containerHeartbeatHandler, taskCommunicatorClassIdentifiers, isLocal);
+ taskHeartbeatHandler, containerHeartbeatHandler, taskCommunicatorDescriptors, isLocal);
addIfService(taskAttemptListener, true);
containerSignatureMatcher = createContainerSignatureMatcher();
@@ -516,7 +542,7 @@ public class DAGAppMaster extends AbstractService {
this.taskSchedulerEventHandler = new TaskSchedulerEventHandler(context,
clientRpcServer, dispatcher.getEventHandler(), containerSignatureMatcher, webUIService,
- taskSchedulerClassIdentifiers, isLocal);
+ taskSchedulerDescriptors, isLocal);
addIfService(taskSchedulerEventHandler, true);
if (enableWebUIService()) {
@@ -534,7 +560,7 @@ public class DAGAppMaster extends AbstractService {
taskSchedulerEventHandler);
addIfServiceDependency(taskSchedulerEventHandler, clientRpcServer);
- this.containerLauncherRouter = createContainerLauncherRouter(conf, containerLauncherClassIdentifiers, isLocal);
+ this.containerLauncherRouter = createContainerLauncherRouter(conf, containerLauncherDescriptors, isLocal);
addIfService(containerLauncherRouter, true);
dispatcher.register(NMCommunicatorEventType.class, containerLauncherRouter);
@@ -1044,11 +1070,11 @@ public class DAGAppMaster extends AbstractService {
protected TaskAttemptListener createTaskAttemptListener(AppContext context,
TaskHeartbeatHandler thh,
ContainerHeartbeatHandler chh,
- String[] taskCommunicatorClasses,
+ List<NamedEntityDescriptor> entityDescriptors,
boolean isLocal) {
TaskAttemptListener lis =
new TaskAttemptListenerImpTezDag(context, thh, chh,
- taskCommunicatorClasses, amConf, isLocal);
+ entityDescriptors, amConf, isLocal);
return lis;
}
@@ -1070,11 +1096,11 @@ public class DAGAppMaster extends AbstractService {
}
protected ContainerLauncherRouter createContainerLauncherRouter(Configuration conf,
- String[] containerLauncherClasses,
+ List<NamedEntityDescriptor> containerLauncherDescriptors,
boolean isLocal) throws
UnknownHostException {
return new ContainerLauncherRouter(conf, context, taskAttemptListener, workingDirectory,
- containerLauncherClasses, isLocal);
+ containerLauncherDescriptors, isLocal);
}
public ApplicationId getAppID() {
@@ -2134,7 +2160,16 @@ public class DAGAppMaster extends AbstractService {
// TODO Does this really need to be a YarnConfiguration ?
Configuration conf = new Configuration(new YarnConfiguration());
- TezUtilsInternal.addUserSpecifiedTezConfiguration(System.getenv(Environment.PWD.name()), conf);
+
+ ConfigurationProto confProto =
+ TezUtilsInternal.readUserSpecifiedTezConfiguration(System.getenv(Environment.PWD.name()));
+ TezUtilsInternal.addUserSpecifiedTezConfiguration(conf, confProto.getConfKeyValuesList());
+
+ AMPluginDescriptorProto amPluginDescriptorProto = null;
+ if (confProto.hasAmPluginDescriptor()) {
+ amPluginDescriptorProto = confProto.getAmPluginDescriptor();
+ }
+
UserGroupInformation.setConfiguration(conf);
Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
@@ -2146,7 +2181,7 @@ public class DAGAppMaster extends AbstractService {
System.getenv(Environment.PWD.name()),
TezCommonUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS.name())),
TezCommonUtils.getTrimmedStrings(System.getenv(Environment.LOG_DIRS.name())),
- clientVersion, maxAppAttempts, credentials, jobUserName);
+ clientVersion, maxAppAttempts, credentials, jobUserName, amPluginDescriptorProto);
ShutdownHookManager.get().addShutdownHook(
new DAGAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY);
@@ -2252,7 +2287,7 @@ public class DAGAppMaster extends AbstractService {
LOG.info("Running DAG: " + dagPlan.getName());
String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime());
- System.err.println(timeStamp + " Running Dag: "+ newDAG.getID());
+ System.err.println(timeStamp + " Running Dag: " + newDAG.getID());
System.out.println(timeStamp + " Running Dag: "+ newDAG.getID());
// Job name is the same as the app name until we support multiple dags
// for an app later
@@ -2358,60 +2393,51 @@ public class DAGAppMaster extends AbstractService {
TezConfiguration.TEZ_AM_WEBSERVICE_ENABLE_DEFAULT);
}
- // Tez default classnames are populated as TezConfiguration.TEZ_AM_SERVICE_PLUGINS_DEFAULT
- private String[] parsePlugins(BiMap<String, Integer> pluginMap, String[] pluginStrings,
- String context) {
- // TODO TEZ-2003 Duplicate error checking - ideally in the client itself. Depends on the final API.
- Preconditions.checkState(pluginStrings != null && pluginStrings.length > 0,
- "Plugin strings should not be null or empty: " + context);
-
- String[] classNames = new String[pluginStrings.length];
+ private static List<NamedEntityDescriptor> parsePlugin(
+ BiMap<String, Integer> pluginMap, List<TezNamedEntityDescriptorProto> namedEntityDescriptorProtos,
+ boolean tezYarnEnabled, boolean uberEnabled) {
int index = 0;
- for (String pluginString : pluginStrings) {
-
- String className;
- String identifierString;
-
- Preconditions.checkState(pluginString != null && !pluginString.isEmpty(),
- "Plugin string: " + pluginString + " should not be null or empty");
- if (pluginString.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT) ||
- pluginString.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) {
- // Kind of ugly, but Tez internal routing is encoded via a String instead of classnames.
- // Individual components - TaskComm, Scheduler, Launcher deal with actual classname translation,
- // and avoid reflection.
- identifierString = pluginString;
- className = pluginString;
- } else {
- String[] parts = pluginString.split(":");
- Preconditions.checkState(
- parts.length == 2 && parts[0] != null && !parts[0].isEmpty() && parts[1] != null &&
- !parts[1].isEmpty(),
- "Invalid configuration string for " + context + ": " + pluginString);
- Preconditions.checkState(
- !parts[0].equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT) &&
- !parts[0].equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT),
- "Identifier cannot be " + TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT + " or " +
- TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT + " for " +
- pluginString);
- identifierString = parts[0];
- className = parts[1];
- }
- pluginMap.put(identifierString, index);
- classNames[index] = className;
+
+ List<NamedEntityDescriptor> resultList = new LinkedList<>();
+
+ if (tezYarnEnabled) {
+ // Default classnames will be populated by individual components
+ NamedEntityDescriptor r = new NamedEntityDescriptor(
+ TezConstants.getTezYarnServicePluginName(), null);
+ resultList.add(r);
+ pluginMap.put(TezConstants.getTezYarnServicePluginName(), index);
+ index++;
+ }
+
+ if (uberEnabled) {
+ // Default classnames will be populated by individual components
+ NamedEntityDescriptor r = new NamedEntityDescriptor(
+ TezConstants.getTezUberServicePluginName(), null);
+ resultList.add(r);
+ pluginMap.put(TezConstants.getTezUberServicePluginName(), index);
index++;
}
- return classNames;
+
+ if (namedEntityDescriptorProtos != null) {
+ for (TezNamedEntityDescriptorProto namedEntityDescriptorProto : namedEntityDescriptorProtos) {
+ resultList.add(DagTypeConverters
+ .convertNamedDescriptorFromProto(namedEntityDescriptorProto));
+ pluginMap.put(resultList.get(index).getEntityName(), index);
+ index++;
+ }
+ }
+ return resultList;
}
- String buildPluginComponentLog(String[] classIdentifiers, BiMap<String, Integer> map,
+ String buildPluginComponentLog(List<NamedEntityDescriptor> namedEntityDescriptors, BiMap<String, Integer> map,
String component) {
StringBuilder sb = new StringBuilder();
sb.append("AM Level configured ").append(component).append(": ");
- for (int i = 0; i < classIdentifiers.length; i++) {
+ for (int i = 0; i < namedEntityDescriptors.size(); i++) {
sb.append("[").append(i).append(":").append(map.inverse().get(i))
- .append(":").append(classIdentifiers[i]).append("]");
- if (i != classIdentifiers.length - 1) {
+ .append(":").append(namedEntityDescriptors.get(i).getClassName()).append("]");
+ if (i != namedEntityDescriptors.size() - 1) {
sb.append(",");
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/cfd625e9/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index 599c208..1e34184 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -27,8 +27,11 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
import org.apache.commons.collections4.ListUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.api.NamedEntityDescriptor;
+import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.serviceplugins.api.ContainerEndReason;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
@@ -46,7 +49,6 @@ import org.apache.tez.dag.api.TaskCommunicator;
import org.apache.tez.dag.api.TaskCommunicatorContext;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.dag.api.TaskHeartbeatResponse;
-import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -100,28 +102,28 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
public TaskAttemptListenerImpTezDag(AppContext context,
TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh,
- String [] taskCommunicatorClassIdentifiers,
+ List<NamedEntityDescriptor> taskCommunicatorDescriptors,
Configuration conf,
boolean isPureLocalMode) {
super(TaskAttemptListenerImpTezDag.class.getName());
this.context = context;
this.taskHeartbeatHandler = thh;
this.containerHeartbeatHandler = chh;
- if (taskCommunicatorClassIdentifiers == null || taskCommunicatorClassIdentifiers.length == 0) {
+ if (taskCommunicatorDescriptors == null || taskCommunicatorDescriptors.isEmpty()) {
if (isPureLocalMode) {
- taskCommunicatorClassIdentifiers =
- new String[]{TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT};
+ taskCommunicatorDescriptors = Lists.newArrayList(new NamedEntityDescriptor(
+ TezConstants.getTezUberServicePluginName(), null));
} else {
- taskCommunicatorClassIdentifiers =
- new String[]{TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT};
+ taskCommunicatorDescriptors = Lists.newArrayList(new NamedEntityDescriptor(
+ TezConstants.getTezYarnServicePluginName(), null));
}
}
- this.taskCommunicators = new TaskCommunicator[taskCommunicatorClassIdentifiers.length];
- this.taskCommunicatorContexts = new TaskCommunicatorContext[taskCommunicatorClassIdentifiers.length];
- this.taskCommunicatorServiceWrappers = new ServicePluginLifecycleAbstractService[taskCommunicatorClassIdentifiers.length];
- for (int i = 0 ; i < taskCommunicatorClassIdentifiers.length ; i++) {
+ this.taskCommunicators = new TaskCommunicator[taskCommunicatorDescriptors.size()];
+ this.taskCommunicatorContexts = new TaskCommunicatorContext[taskCommunicatorDescriptors.size()];
+ this.taskCommunicatorServiceWrappers = new ServicePluginLifecycleAbstractService[taskCommunicatorDescriptors.size()];
+ for (int i = 0 ; i < taskCommunicatorDescriptors.size() ; i++) {
taskCommunicatorContexts[i] = new TaskCommunicatorContextImpl(context, this, conf, i);
- taskCommunicators[i] = createTaskCommunicator(taskCommunicatorClassIdentifiers[i], i);
+ taskCommunicators[i] = createTaskCommunicator(taskCommunicatorDescriptors.get(i), i);
taskCommunicatorServiceWrappers[i] = new ServicePluginLifecycleAbstractService(taskCommunicators[i]);
}
// TODO TEZ-2118 Start using taskCommunicator indices properly
@@ -143,17 +145,18 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
}
}
- private TaskCommunicator createTaskCommunicator(String taskCommClassIdentifier, int taskCommIndex) {
- if (taskCommClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) {
+ private TaskCommunicator createTaskCommunicator(NamedEntityDescriptor taskCommDescriptor, int taskCommIndex) {
+ if (taskCommDescriptor.getEntityName().equals(TezConstants.getTezYarnServicePluginName())) {
LOG.info("Using Default Task Communicator");
return createTezTaskCommunicator(taskCommunicatorContexts[taskCommIndex]);
- } else if (taskCommClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) {
+ } else if (taskCommDescriptor.getEntityName().equals(TezConstants.getTezUberServicePluginName())) {
LOG.info("Using Default Local Task Communicator");
return new TezLocalTaskCommunicatorImpl(taskCommunicatorContexts[taskCommIndex]);
} else {
- LOG.info("Using TaskCommunicator: " + taskCommClassIdentifier);
+ // TODO TEZ-2003. Use the payload
+ LOG.info("Using TaskCommunicator {}:{} " + taskCommDescriptor.getEntityName(), taskCommDescriptor.getClassName());
Class<? extends TaskCommunicator> taskCommClazz = (Class<? extends TaskCommunicator>) ReflectionUtils
- .getClazz(taskCommClassIdentifier);
+ .getClazz(taskCommDescriptor.getClassName());
try {
Constructor<? extends TaskCommunicator> ctor = taskCommClazz.getConstructor(TaskCommunicatorContext.class);
ctor.setAccessible(true);
http://git-wip-us.apache.org/repos/asf/tez/blob/cfd625e9/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
index 458362f..335239e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
@@ -98,4 +98,6 @@ public interface DAG {
StateChangeNotifier getStateChangeNotifier();
+ org.apache.tez.dag.api.Vertex.VertexExecutionContext getDefaultExecutionContext();
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/cfd625e9/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index ec2ef66..25518b0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -717,6 +717,15 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
}
@Override
+ public org.apache.tez.dag.api.Vertex.VertexExecutionContext getDefaultExecutionContext() {
+ if (jobPlan.hasDefaultExecutionContext()) {
+ return DagTypeConverters.convertFromProto(jobPlan.getDefaultExecutionContext());
+ } else {
+ return null;
+ }
+ }
+
+ @Override
public TezCounters getAllCounters() {
readLock.lock();
http://git-wip-us.apache.org/repos/asf/tez/blob/cfd625e9/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index bdab984..2e8f218 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -995,14 +995,37 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
TezConfiguration.TEZ_LOCAL_MODE_DEFAULT);
String tezDefaultComponentName =
- isLocal ? TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT :
- TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT;
- String taskSchedulerName =
- vertexConf.get(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, tezDefaultComponentName);
- String taskCommName = vertexConf
- .get(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME, tezDefaultComponentName);
- String containerLauncherName = vertexConf
- .get(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, tezDefaultComponentName);
+ isLocal ? TezConstants.getTezUberServicePluginName() :
+ TezConstants.getTezYarnServicePluginName();
+
+ org.apache.tez.dag.api.Vertex.VertexExecutionContext execContext = dag.getDefaultExecutionContext();
+ if (vertexPlan.hasExecutionContext()) {
+ execContext = DagTypeConverters.convertFromProto(vertexPlan.getExecutionContext());
+ LOG.info("Using ExecutionContext from Vertex for Vertex {}", vertexName);
+ } else if (execContext != null) {
+ LOG.info("Using ExecutionContext from DAG for Vertex {}", vertexName);
+ }
+ if (execContext != null) {
+ if (execContext.shouldExecuteInAm()) {
+ tezDefaultComponentName = TezConstants.getTezUberServicePluginName();
+ }
+ }
+
+ String taskSchedulerName = tezDefaultComponentName;
+ String containerLauncherName = tezDefaultComponentName;
+ String taskCommName = tezDefaultComponentName;
+
+ if (execContext != null) {
+ if (execContext.getTaskSchedulerName() != null) {
+ taskSchedulerName = execContext.getTaskSchedulerName();
+ }
+ if (execContext.getContainerLauncherName() != null) {
+ containerLauncherName = execContext.getContainerLauncherName();
+ }
+ if (execContext.getTaskCommName() != null) {
+ taskCommName = execContext.getTaskCommName();
+ }
+ }
LOG.info("Vertex: " + logIdentifier + " configured with TaskScheduler=" + taskSchedulerName +
", ContainerLauncher=" + containerLauncherName + ", TaskComm=" + taskCommName);
http://git-wip-us.apache.org/repos/asf/tez/blob/cfd625e9/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
index 34c7bc0..cba5c80 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
@@ -30,11 +30,11 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
import org.apache.tez.serviceplugins.api.ContainerLauncher;
import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
import org.apache.tez.serviceplugins.api.ContainerStopRequest;
-import org.apache.tez.dag.api.TezConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -269,8 +269,8 @@ public class ContainerLauncherImpl extends ContainerLauncher {
// nodes where containers will run at *this* point of time. This is
// *not* the cluster size and doesn't need to be.
- int numNodes = getContext().getNumNodes(
- TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT);
+ int numNodes =
+ getContext().getNumNodes(TezConstants.getTezYarnServicePluginName());
int idealPoolSize = Math.min(limitOnPoolSize, numNodes);
if (poolSize < idealPoolSize) {