You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/14 22:58:54 UTC
[36/50] [abbrv] tez git commit: TEZ-2652. Cleanup the way services
are specified for an AM and vertices. (sseth)
http://git-wip-us.apache.org/repos/asf/tez/blob/cfd625e9/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
index 7c6a6a4..594e6d3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
@@ -17,18 +17,21 @@ package org.apache.tez.dag.app.launcher;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.UnknownHostException;
+import java.util.List;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.dag.api.NamedEntityDescriptor;
+import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
import org.apache.tez.serviceplugins.api.ContainerLauncher;
import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
import org.apache.tez.serviceplugins.api.ContainerStopRequest;
-import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerLauncherContextImpl;
@@ -63,35 +66,35 @@ public class ContainerLauncherRouter extends AbstractService
public ContainerLauncherRouter(Configuration conf, AppContext context,
TaskAttemptListener taskAttemptListener,
String workingDirectory,
- String[] containerLauncherClassIdentifiers,
+ List<NamedEntityDescriptor> containerLauncherDescriptors,
boolean isPureLocalMode) throws UnknownHostException {
super(ContainerLauncherRouter.class.getName());
this.appContext = context;
- if (containerLauncherClassIdentifiers == null || containerLauncherClassIdentifiers.length == 0) {
+ if (containerLauncherDescriptors == null || containerLauncherDescriptors.isEmpty()) {
if (isPureLocalMode) {
- containerLauncherClassIdentifiers =
- new String[]{TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT};
+ containerLauncherDescriptors = Lists.newArrayList(new NamedEntityDescriptor(
+ TezConstants.getTezUberServicePluginName(), null));
} else {
- containerLauncherClassIdentifiers =
- new String[]{TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT};
+ containerLauncherDescriptors = Lists.newArrayList(new NamedEntityDescriptor(
+ TezConstants.getTezYarnServicePluginName(), null));
}
}
- containerLauncherContexts = new ContainerLauncherContext[containerLauncherClassIdentifiers.length];
- containerLaunchers = new ContainerLauncher[containerLauncherClassIdentifiers.length];
- containerLauncherServiceWrappers = new ServicePluginLifecycleAbstractService[containerLauncherClassIdentifiers.length];
+ containerLauncherContexts = new ContainerLauncherContext[containerLauncherDescriptors.size()];
+ containerLaunchers = new ContainerLauncher[containerLauncherDescriptors.size()];
+ containerLauncherServiceWrappers = new ServicePluginLifecycleAbstractService[containerLauncherDescriptors.size()];
- for (int i = 0; i < containerLauncherClassIdentifiers.length; i++) {
+ for (int i = 0; i < containerLauncherDescriptors.size(); i++) {
ContainerLauncherContext containerLauncherContext = new ContainerLauncherContextImpl(context, taskAttemptListener);
containerLauncherContexts[i] = containerLauncherContext;
- containerLaunchers[i] = createContainerLauncher(containerLauncherClassIdentifiers[i], context,
+ containerLaunchers[i] = createContainerLauncher(containerLauncherDescriptors.get(i), context,
containerLauncherContext, taskAttemptListener, workingDirectory, isPureLocalMode, conf);
containerLauncherServiceWrappers[i] = new ServicePluginLifecycleAbstractService(containerLaunchers[i]);
}
}
- private ContainerLauncher createContainerLauncher(String containerLauncherClassIdentifier,
+ private ContainerLauncher createContainerLauncher(NamedEntityDescriptor containerLauncherDescriptor,
AppContext context,
ContainerLauncherContext containerLauncherContext,
TaskAttemptListener taskAttemptListener,
@@ -99,11 +102,12 @@ public class ContainerLauncherRouter extends AbstractService
boolean isPureLocalMode,
Configuration conf) throws
UnknownHostException {
- if (containerLauncherClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) {
+ if (containerLauncherDescriptor.getEntityName().equals(
+ TezConstants.getTezYarnServicePluginName())) {
LOG.info("Creating DefaultContainerLauncher");
return new ContainerLauncherImpl(containerLauncherContext);
- } else if (containerLauncherClassIdentifier
- .equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) {
+ } else if (containerLauncherDescriptor.getEntityName()
+ .equals(TezConstants.getTezUberServicePluginName())) {
LOG.info("Creating LocalContainerLauncher");
// TODO Post TEZ-2003. LocalContainerLauncher is special cased, since it makes use of
// extensive internals which are only available at runtime. Will likely require
@@ -111,10 +115,10 @@ public class ContainerLauncherRouter extends AbstractService
return
new LocalContainerLauncher(containerLauncherContext, context, taskAttemptListener, workingDirectory, isPureLocalMode);
} else {
- LOG.info("Creating container launcher : " + containerLauncherClassIdentifier);
+ LOG.info("Creating container launcher {}:{} ", containerLauncherDescriptor.getEntityName(), containerLauncherDescriptor.getClassName());
Class<? extends ContainerLauncher> containerLauncherClazz =
(Class<? extends ContainerLauncher>) ReflectionUtils.getClazz(
- containerLauncherClassIdentifier);
+ containerLauncherDescriptor.getClassName());
try {
Constructor<? extends ContainerLauncher> ctor = containerLauncherClazz
.getConstructor(ContainerLauncherContext.class);
http://git-wip-us.apache.org/repos/asf/tez/blob/cfd625e9/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index d8cf080..67a088e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -22,6 +22,7 @@ import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
@@ -34,6 +35,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.tez.dag.api.NamedEntityDescriptor;
+import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
import org.apache.tez.serviceplugins.api.TaskScheduler;
import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
@@ -55,7 +58,6 @@ import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.TaskLocationHint.TaskBasedLocationAffinity;
@@ -143,14 +145,14 @@ public class TaskSchedulerEventHandler extends AbstractService implements
* @param eventHandler
* @param containerSignatureMatcher
* @param webUI
- * @param schedulerClasses the list of scheduler classes / codes. Tez internal classes are represented as codes.
+ * @param schedulerDescriptors the list of scheduler descriptors. Tez internal classes will not have the class names populated.
* An empty list defaults to using the YarnTaskScheduler as the only source.
*/
@SuppressWarnings("rawtypes")
public TaskSchedulerEventHandler(AppContext appContext,
DAGClientServer clientService, EventHandler eventHandler,
ContainerSignatureMatcher containerSignatureMatcher, WebUIService webUI,
- String [] schedulerClasses, boolean isPureLocalMode) {
+ List<NamedEntityDescriptor> schedulerDescriptors, boolean isPureLocalMode) {
super(TaskSchedulerEventHandler.class.getName());
this.appContext = appContext;
this.eventHandler = eventHandler;
@@ -166,31 +168,34 @@ public class TaskSchedulerEventHandler extends AbstractService implements
// Override everything for pure local mode
if (isPureLocalMode) {
- this.taskSchedulerClasses = new String[] {TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT};
+ this.taskSchedulerClasses = new String[] {TezConstants.getTezUberServicePluginName()};
this.yarnTaskSchedulerIndex = -1;
} else {
- if (schedulerClasses == null || schedulerClasses.length ==0) {
- this.taskSchedulerClasses = new String[] {TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT};
+ if (schedulerDescriptors == null || schedulerDescriptors.isEmpty()) {
+ this.taskSchedulerClasses = new String[] {TezConstants.getTezYarnServicePluginName()};
this.yarnTaskSchedulerIndex = 0;
} else {
// Ensure the YarnScheduler will be setup and note it's index. This will be responsible for heartbeats and YARN registration.
int foundYarnTaskSchedulerIndex = -1;
- for (int i = 0 ; i < schedulerClasses.length ; i++) {
- if (schedulerClasses[i].equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) {
+
+ List<String> taskSchedulerClassList = new LinkedList<>();
+ for (int i = 0 ; i < schedulerDescriptors.size() ; i++) {
+ if (schedulerDescriptors.get(i).getEntityName().equals(
+ TezConstants.getTezYarnServicePluginName())) {
+ taskSchedulerClassList.add(schedulerDescriptors.get(i).getEntityName());
foundYarnTaskSchedulerIndex = i;
- break;
+ } else if (schedulerDescriptors.get(i).getEntityName().equals(
+ TezConstants.getTezUberServicePluginName())) {
+ taskSchedulerClassList.add(schedulerDescriptors.get(i).getEntityName());
+ } else {
+ taskSchedulerClassList.add(schedulerDescriptors.get(i).getClassName());
}
}
- if (foundYarnTaskSchedulerIndex == -1) { // Not found. Add at the end.
- this.taskSchedulerClasses = new String[schedulerClasses.length+1];
- foundYarnTaskSchedulerIndex = this.taskSchedulerClasses.length -1;
- for (int i = 0 ; i < schedulerClasses.length ; i++) { // Copy over the rest.
- this.taskSchedulerClasses[i] = schedulerClasses[i];
- }
- this.taskSchedulerClasses[foundYarnTaskSchedulerIndex] = TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT;
- } else {
- this.taskSchedulerClasses = schedulerClasses;
+ if (foundYarnTaskSchedulerIndex == -1) {
+ taskSchedulerClassList.add(YarnTaskSchedulerService.class.getName());
+ foundYarnTaskSchedulerIndex = taskSchedulerClassList.size() -1;
}
+ this.taskSchedulerClasses = taskSchedulerClassList.toArray(new String[taskSchedulerClassList.size()]);
this.yarnTaskSchedulerIndex = foundYarnTaskSchedulerIndex;
}
}
@@ -419,10 +424,10 @@ public class TaskSchedulerEventHandler extends AbstractService implements
new TaskSchedulerContextImpl(this, appContext, schedulerId, trackingUrl,
customAppIdIdentifier, host, port, getConfig());
TaskSchedulerContext wrappedContext = new TaskSchedulerContextImplWrapper(rawContext, appCallbackExecutor);
- if (schedulerClassName.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) {
+ if (schedulerClassName.equals(TezConstants.getTezYarnServicePluginName())) {
LOG.info("Creating TaskScheduler: YarnTaskSchedulerService");
return new YarnTaskSchedulerService(wrappedContext);
- } else if (schedulerClassName.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) {
+ } else if (schedulerClassName.equals(TezConstants.getTezUberServicePluginName())) {
LOG.info("Creating TaskScheduler: Local TaskScheduler");
return new LocalTaskSchedulerService(wrappedContext);
} else {
@@ -454,7 +459,7 @@ public class TaskSchedulerEventHandler extends AbstractService implements
for (int i = 0; i < taskSchedulerClasses.length; i++) {
long customAppIdIdentifier;
if (isPureLocalMode || taskSchedulerClasses[i].equals(
- TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) { // Use the app identifier from the appId.
+ TezConstants.getTezYarnServicePluginName())) { // Use the app identifier from the appId.
customAppIdIdentifier = appContext.getApplicationID().getClusterTimestamp();
} else {
customAppIdIdentifier = SCHEDULER_APP_ID_BASE + (j++ * SCHEDULER_APP_ID_INCREMENT);
http://git-wip-us.apache.org/repos/asf/tez/blob/cfd625e9/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index 21ae5f7..17feeaa 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.tez.dag.api.NamedEntityDescriptor;
import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
import org.apache.tez.serviceplugins.api.ContainerLauncher;
import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
@@ -486,7 +487,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
Credentials credentials, String jobUserName, int handlerConcurrency, int numConcurrentContainers) {
super(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort, clock, appSubmitTime,
isSession, workingDirectory, localDirs, logDirs, new TezApiVersionInfo().getVersion(), 1,
- credentials, jobUserName);
+ credentials, jobUserName, null);
containerLauncherContext = new ContainerLauncherContextImpl(getContext(), getTaskAttemptListener());
containerLauncher = new MockContainerLauncher(launcherGoFlag, containerLauncherContext);
shutdownHandler = new MockDAGAppMasterShutdownHandler();
@@ -500,7 +501,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
// use mock container launcher for tests
@Override
protected ContainerLauncherRouter createContainerLauncherRouter(final Configuration conf,
- String[] containerLaunchers,
+ List<NamedEntityDescriptor> containerLauncherDescirptors,
boolean isLocal)
throws UnknownHostException {
return new ContainerLauncherRouter(containerLauncher, getContext());
http://git-wip-us.apache.org/repos/asf/tez/blob/cfd625e9/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
index 41a7373..e45b0a2 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
@@ -50,6 +50,7 @@ import org.apache.tez.common.ContainerTask;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.common.security.TokenCache;
+import org.apache.tez.dag.api.NamedEntityDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.serviceplugins.api.ContainerEndReason;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
@@ -375,10 +376,10 @@ public class TestTaskAttemptListenerImplTezDag {
public TaskAttemptListenerImplForTest(AppContext context,
TaskHeartbeatHandler thh,
ContainerHeartbeatHandler chh,
- String[] taskCommunicatorClassIdentifiers,
+ List<NamedEntityDescriptor> taskCommDescriptors,
Configuration conf,
boolean isPureLocalMode) {
- super(context, thh, chh, taskCommunicatorClassIdentifiers, conf,
+ super(context, thh, chh, taskCommDescriptors, conf,
isPureLocalMode);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/cfd625e9/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
index 3ea0446..f191175 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
@@ -32,6 +32,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -48,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.common.ContainerSignatureMatcher;
+import org.apache.tez.dag.api.NamedEntityDescriptor;
import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.client.DAGClientServer;
@@ -91,7 +93,7 @@ public class TestTaskSchedulerEventHandler {
public MockTaskSchedulerEventHandler(AppContext appContext,
DAGClientServer clientService, EventHandler eventHandler,
ContainerSignatureMatcher containerSignatureMatcher, WebUIService webUI) {
- super(appContext, clientService, eventHandler, containerSignatureMatcher, webUI, new String[] {}, false);
+ super(appContext, clientService, eventHandler, containerSignatureMatcher, webUI, new LinkedList<NamedEntityDescriptor>(), false);
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/cfd625e9/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
index 966c95a..60d37e9 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.common.ContainerSignatureMatcher;
+import org.apache.tez.dag.api.NamedEntityDescriptor;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
import org.apache.tez.dag.app.rm.YarnTaskSchedulerService.CookieContainerRequest;
@@ -130,7 +131,7 @@ class TestTaskSchedulerHelpers {
EventHandler eventHandler,
TezAMRMClientAsync<CookieContainerRequest> amrmClientAsync,
ContainerSignatureMatcher containerSignatureMatcher) {
- super(appContext, null, eventHandler, containerSignatureMatcher, null, new String[]{}, false);
+ super(appContext, null, eventHandler, containerSignatureMatcher, null, new LinkedList<NamedEntityDescriptor>(), false);
this.amrmClientAsync = amrmClientAsync;
this.containerSignatureMatcher = containerSignatureMatcher;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/cfd625e9/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java b/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
index ba17ba0..611e8cc 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
@@ -20,8 +20,8 @@ package org.apache.tez.examples;
import java.io.IOException;
import java.util.Set;
-import java.util.Map;
+import org.apache.tez.dag.api.Vertex.VertexExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -137,6 +137,9 @@ public class JoinValidate extends TezExampleBase {
private DAG createDag(TezConfiguration tezConf, Path lhs, Path rhs, int numPartitions)
throws IOException {
DAG dag = DAG.create(getDagName());
+ if (getDefaultExecutionContext() != null) {
+ dag.setExecutionContext(getDefaultExecutionContext());
+ }
// Configuration for intermediate output - shared by Vertex1 and Vertex2
// This should only be setting selective keys from the underlying conf. Fix after there's a
@@ -153,18 +156,18 @@ public class JoinValidate extends TezExampleBase {
MRInput
.createConfigBuilder(new Configuration(tezConf), TextInputFormat.class,
lhs.toUri().toString()).groupSplits(!isDisableSplitGrouping()).build());
- setVertexProperties(lhsVertex, getLhsVertexProperties());
+ setVertexExecutionContext(lhsVertex, getLhsExecutionContext());
Vertex rhsVertex = Vertex.create(RHS_INPUT_NAME, ProcessorDescriptor.create(
ForwardingProcessor.class.getName())).addDataSource("rhs",
MRInput
.createConfigBuilder(new Configuration(tezConf), TextInputFormat.class,
rhs.toUri().toString()).groupSplits(!isDisableSplitGrouping()).build());
- setVertexProperties(rhsVertex, getRhsVertexProperties());
+ setVertexExecutionContext(rhsVertex, getRhsExecutionContext());
Vertex joinValidateVertex = Vertex.create("joinvalidate", ProcessorDescriptor.create(
JoinValidateProcessor.class.getName()), numPartitions);
- setVertexProperties(joinValidateVertex, getValidateVertexProperties());
+ setVertexExecutionContext(joinValidateVertex, getValidateExecutionContext());
Edge e1 = Edge.create(lhsVertex, joinValidateVertex, edgeConf.createDefaultEdgeProperty());
Edge e2 = Edge.create(rhsVertex, joinValidateVertex, edgeConf.createDefaultEdgeProperty());
@@ -174,23 +177,25 @@ public class JoinValidate extends TezExampleBase {
return dag;
}
- private void setVertexProperties(Vertex vertex, Map<String, String> properties) {
- if (properties != null) {
- for (Map.Entry<String, String> entry : properties.entrySet()) {
- vertex.setConf(entry.getKey(), entry.getValue());
- }
+ private void setVertexExecutionContext(Vertex vertex, VertexExecutionContext executionContext) {
+ if (executionContext != null) {
+ vertex.setExecutionContext(executionContext);
}
}
- protected Map<String, String> getLhsVertexProperties() {
+ protected VertexExecutionContext getDefaultExecutionContext() {
return null;
}
- protected Map<String, String> getRhsVertexProperties() {
+ protected VertexExecutionContext getLhsExecutionContext() {
return null;
}
- protected Map<String, String> getValidateVertexProperties() {
+ protected VertexExecutionContext getRhsExecutionContext() {
+ return null;
+ }
+
+ protected VertexExecutionContext getValidateExecutionContext() {
return null;
}
@@ -240,4 +245,6 @@ public class JoinValidate extends TezExampleBase {
}
}
}
+
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/cfd625e9/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
index 85f9415..0002b42 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
@@ -121,7 +121,8 @@ public class TezTestServiceContainerLauncher extends ContainerLauncher {
private RunContainerRequestProto constructRunContainerRequest(ContainerLaunchRequest launchRequest) throws
IOException {
RunContainerRequestProto.Builder builder = RunContainerRequestProto.newBuilder();
- Preconditions.checkArgument(launchRequest.getTaskCommunicatorName().equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT));
+ Preconditions.checkArgument(launchRequest.getTaskCommunicatorName().equals(
+ TezConstants.getTezYarnServicePluginName()));
InetSocketAddress address = (InetSocketAddress) getContext().getTaskCommunicatorMetaInfo(launchRequest.getTaskCommunicatorName());
builder.setAmHost(address.getHostName()).setAmPort(address.getPort());
builder.setAppAttemptNumber(appAttemptId.getAttemptId());
http://git-wip-us.apache.org/repos/asf/tez/blob/cfd625e9/tez-ext-service-tests/src/test/java/org/apache/tez/examples/JoinValidateConfigured.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/examples/JoinValidateConfigured.java b/tez-ext-service-tests/src/test/java/org/apache/tez/examples/JoinValidateConfigured.java
index e5d2e3b..f31476f 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/examples/JoinValidateConfigured.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/examples/JoinValidateConfigured.java
@@ -14,36 +14,46 @@
package org.apache.tez.examples;
-import java.util.Map;
+
+import org.apache.tez.dag.api.Vertex.VertexExecutionContext;
public class JoinValidateConfigured extends JoinValidate {
- private final Map<String, String> lhsProps;
- private final Map<String, String> rhsProps;
- private final Map<String, String> validateProps;
+ private final VertexExecutionContext defaultExecutionContext;
+ private final VertexExecutionContext lhsContext;
+ private final VertexExecutionContext rhsContext;
+ private final VertexExecutionContext validateContext;
private final String dagNameSuffix;
- public JoinValidateConfigured(Map<String, String> lhsProps, Map<String, String> rhsProps,
- Map<String, String> validateProps, String dagNameSuffix) {
- this.lhsProps = lhsProps;
- this.rhsProps = rhsProps;
- this.validateProps = validateProps;
+ public JoinValidateConfigured(VertexExecutionContext defaultExecutionContext,
+ VertexExecutionContext lhsContext,
+ VertexExecutionContext rhsContext,
+ VertexExecutionContext validateContext, String dagNameSuffix) {
+ this.defaultExecutionContext = defaultExecutionContext;
+ this.lhsContext = lhsContext;
+ this.rhsContext = rhsContext;
+ this.validateContext = validateContext;
this.dagNameSuffix = dagNameSuffix;
}
@Override
- protected Map<String, String> getLhsVertexProperties() {
- return this.lhsProps;
+ protected VertexExecutionContext getDefaultExecutionContext() {
+ return this.defaultExecutionContext;
+ }
+
+ @Override
+ protected VertexExecutionContext getLhsExecutionContext() {
+ return this.lhsContext;
}
@Override
- protected Map<String, String> getRhsVertexProperties() {
- return this.rhsProps;
+ protected VertexExecutionContext getRhsExecutionContext() {
+ return this.rhsContext;
}
@Override
- protected Map<String, String> getValidateVertexProperties() {
- return this.validateProps;
+ protected VertexExecutionContext getValidateExecutionContext() {
+ return this.validateContext;
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/cfd625e9/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
index 45c70f1..07dd363 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
@@ -19,7 +19,6 @@ import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.Map;
-import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -28,9 +27,9 @@ import org.apache.tez.client.TezClient;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.Vertex.VertexExecutionContext;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.app.launcher.TezTestServiceNoOpContainerLauncher;
@@ -43,6 +42,10 @@ import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.processor.SleepProcessor;
import org.apache.tez.service.MiniTezTestServiceCluster;
import org.apache.tez.service.impl.ContainerRunnerImpl;
+import org.apache.tez.serviceplugins.api.ContainerLauncherDescriptor;
+import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
+import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor;
+import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor;
import org.apache.tez.test.MiniTezCluster;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -72,9 +75,15 @@ public class TestExternalTezServices {
private static final Path HASH_JOIN_EXPECTED_RESULT_PATH = new Path(SRC_DATA_DIR, "expectedOutputPath");
private static final Path HASH_JOIN_OUTPUT_PATH = new Path(SRC_DATA_DIR, "outPath");
- private static final Map<String, String> PROPS_EXT_SERVICE_PUSH = Maps.newHashMap();
- private static final Map<String, String> PROPS_REGULAR_CONTAINERS = Maps.newHashMap();
- private static final Map<String, String> PROPS_IN_AM = Maps.newHashMap();
+ private static final VertexExecutionContext EXECUTION_CONTEXT_EXT_SERVICE_PUSH =
+ VertexExecutionContext.create(
+ EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME);
+ private static final VertexExecutionContext EXECUTION_CONTEXT_REGULAR_CONTAINERS =
+ VertexExecutionContext.createExecuteInContainers(true);
+ private static final VertexExecutionContext EXECUTION_CONTEXT_IN_AM =
+ VertexExecutionContext.createExecuteInAm(true);
+
+ private static final VertexExecutionContext EXECUTION_CONTEXT_DEFAULT = EXECUTION_CONTEXT_EXT_SERVICE_PUSH;
private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR + TestExternalTezServices.class.getName()
+ "-tmpDir";
@@ -127,51 +136,28 @@ public class TestExternalTezServices {
confForJobs.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString());
confForJobs.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, false);
- confForJobs.setStrings(TezConfiguration.TEZ_AM_TASK_SCHEDULERS,
- TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT,
- TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT,
- EXT_PUSH_ENTITY_NAME + ":" + TezTestServiceTaskSchedulerService.class.getName());
-
- confForJobs.setStrings(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHERS,
- TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT,
- TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT,
- EXT_PUSH_ENTITY_NAME + ":" + TezTestServiceNoOpContainerLauncher.class.getName());
-
- confForJobs.setStrings(TezConfiguration.TEZ_AM_TASK_COMMUNICATORS,
- TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT,
- TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT,
- EXT_PUSH_ENTITY_NAME + ":" + TezTestServiceTaskCommunicatorImpl.class.getName());
-
- // Default all jobs to run via the service. Individual tests override this on a per vertex/dag level.
- confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, EXT_PUSH_ENTITY_NAME);
- confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, EXT_PUSH_ENTITY_NAME);
- confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME, EXT_PUSH_ENTITY_NAME);
-
- // Setup various executor sets
- PROPS_REGULAR_CONTAINERS.put(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME,
- TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT);
- PROPS_REGULAR_CONTAINERS.put(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME,
- TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT);
- PROPS_REGULAR_CONTAINERS.put(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME,
- TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT);
-
- PROPS_EXT_SERVICE_PUSH.put(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, EXT_PUSH_ENTITY_NAME);
- PROPS_EXT_SERVICE_PUSH.put(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, EXT_PUSH_ENTITY_NAME);
- PROPS_EXT_SERVICE_PUSH.put(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME, EXT_PUSH_ENTITY_NAME);
-
- PROPS_IN_AM.put(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME,
- TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT);
- PROPS_IN_AM.put(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME,
- TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT);
- PROPS_IN_AM.put(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME,
- TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT);
+ TaskSchedulerDescriptor[] taskSchedulerDescriptors = new TaskSchedulerDescriptor[]{
+ TaskSchedulerDescriptor
+ .create(EXT_PUSH_ENTITY_NAME, TezTestServiceTaskSchedulerService.class.getName())};
+
+ ContainerLauncherDescriptor[] containerLauncherDescriptors = new ContainerLauncherDescriptor[]{
+ ContainerLauncherDescriptor
+ .create(EXT_PUSH_ENTITY_NAME, TezTestServiceNoOpContainerLauncher.class.getName())};
+
+ TaskCommunicatorDescriptor[] taskCommunicatorDescriptors = new TaskCommunicatorDescriptor[]{
+ TaskCommunicatorDescriptor
+ .create(EXT_PUSH_ENTITY_NAME, TezTestServiceTaskCommunicatorImpl.class.getName())};
+ ServicePluginsDescriptor servicePluginsDescriptor = ServicePluginsDescriptor.create(true, true,
+ taskSchedulerDescriptors, containerLauncherDescriptors, taskCommunicatorDescriptors);
// Create a session to use for all tests.
TezConfiguration tezClientConf = new TezConfiguration(confForJobs);
- sharedTezClient = TezClient.create(TestExternalTezServices.class.getSimpleName() + "_session",
- tezClientConf, true);
+ sharedTezClient = TezClient
+ .newBuilder(TestExternalTezServices.class.getSimpleName() + "_session", tezClientConf)
+ .setIsSession(true).setServicePluginDescriptor(servicePluginsDescriptor).build();
+
sharedTezClient.start();
LOG.info("Shared TezSession started");
sharedTezClient.waitTillReady();
@@ -225,71 +211,71 @@ public class TestExternalTezServices {
@Test(timeout = 60000)
public void testAllInService() throws Exception {
int expectedExternalSubmissions = 4 + 3; //4 for 4 src files, 3 for num reducers.
- runJoinValidate("AllInService", expectedExternalSubmissions, PROPS_EXT_SERVICE_PUSH,
- PROPS_EXT_SERVICE_PUSH, PROPS_EXT_SERVICE_PUSH);
+ runJoinValidate("AllInService", expectedExternalSubmissions, EXECUTION_CONTEXT_EXT_SERVICE_PUSH,
+ EXECUTION_CONTEXT_EXT_SERVICE_PUSH, EXECUTION_CONTEXT_EXT_SERVICE_PUSH);
}
@Test(timeout = 60000)
public void testAllInContainers() throws Exception {
int expectedExternalSubmissions = 0; // All in containers
- runJoinValidate("AllInContainers", expectedExternalSubmissions, PROPS_REGULAR_CONTAINERS,
- PROPS_REGULAR_CONTAINERS, PROPS_REGULAR_CONTAINERS);
+ runJoinValidate("AllInContainers", expectedExternalSubmissions, EXECUTION_CONTEXT_REGULAR_CONTAINERS,
+ EXECUTION_CONTEXT_REGULAR_CONTAINERS, EXECUTION_CONTEXT_REGULAR_CONTAINERS);
}
@Test(timeout = 60000)
public void testAllInAM() throws Exception {
int expectedExternalSubmissions = 0; // All in AM
- runJoinValidate("AllInAM", expectedExternalSubmissions, PROPS_IN_AM,
- PROPS_IN_AM, PROPS_IN_AM);
+ runJoinValidate("AllInAM", expectedExternalSubmissions, EXECUTION_CONTEXT_IN_AM,
+ EXECUTION_CONTEXT_IN_AM, EXECUTION_CONTEXT_IN_AM);
}
@Test(timeout = 60000)
public void testMixed1() throws Exception { // M-ExtService, R-containers
int expectedExternalSubmissions = 4 + 0; //4 for 4 src files, 0 for num reducers.
- runJoinValidate("Mixed1", expectedExternalSubmissions, PROPS_EXT_SERVICE_PUSH,
- PROPS_EXT_SERVICE_PUSH, PROPS_REGULAR_CONTAINERS);
+ runJoinValidate("Mixed1", expectedExternalSubmissions, EXECUTION_CONTEXT_EXT_SERVICE_PUSH,
+ EXECUTION_CONTEXT_EXT_SERVICE_PUSH, EXECUTION_CONTEXT_REGULAR_CONTAINERS);
}
@Test(timeout = 60000)
public void testMixed2() throws Exception { // M-Containers, R-ExtService
int expectedExternalSubmissions = 0 + 3; // 3 for num reducers.
- runJoinValidate("Mixed2", expectedExternalSubmissions, PROPS_REGULAR_CONTAINERS,
- PROPS_REGULAR_CONTAINERS, PROPS_EXT_SERVICE_PUSH);
+ runJoinValidate("Mixed2", expectedExternalSubmissions, EXECUTION_CONTEXT_REGULAR_CONTAINERS,
+ EXECUTION_CONTEXT_REGULAR_CONTAINERS, EXECUTION_CONTEXT_EXT_SERVICE_PUSH);
}
@Test(timeout = 60000)
public void testMixed3() throws Exception { // M - service, R-AM
int expectedExternalSubmissions = 4 + 0; //4 for 4 src files, 0 for num reducers (in-AM).
- runJoinValidate("Mixed3", expectedExternalSubmissions, PROPS_EXT_SERVICE_PUSH,
- PROPS_EXT_SERVICE_PUSH, PROPS_IN_AM);
+ runJoinValidate("Mixed3", expectedExternalSubmissions, EXECUTION_CONTEXT_EXT_SERVICE_PUSH,
+ EXECUTION_CONTEXT_EXT_SERVICE_PUSH, EXECUTION_CONTEXT_IN_AM);
}
@Test(timeout = 60000)
public void testMixed4() throws Exception { // M - containers, R-AM
int expectedExternalSubmissions = 0 + 0; // Nothing in external service.
- runJoinValidate("Mixed4", expectedExternalSubmissions, PROPS_REGULAR_CONTAINERS,
- PROPS_REGULAR_CONTAINERS, PROPS_IN_AM);
+ runJoinValidate("Mixed4", expectedExternalSubmissions, EXECUTION_CONTEXT_REGULAR_CONTAINERS,
+ EXECUTION_CONTEXT_REGULAR_CONTAINERS, EXECUTION_CONTEXT_IN_AM);
}
@Test(timeout = 60000)
public void testMixed5() throws Exception { // M1 - containers, M2-extservice, R-AM
int expectedExternalSubmissions = 2 + 0; // 2 for M2
- runJoinValidate("Mixed5", expectedExternalSubmissions, PROPS_REGULAR_CONTAINERS,
- PROPS_EXT_SERVICE_PUSH, PROPS_IN_AM);
+ runJoinValidate("Mixed5", expectedExternalSubmissions, EXECUTION_CONTEXT_REGULAR_CONTAINERS,
+ EXECUTION_CONTEXT_EXT_SERVICE_PUSH, EXECUTION_CONTEXT_IN_AM);
}
@Test(timeout = 60000)
public void testMixed6() throws Exception { // M - AM, R - Service
int expectedExternalSubmissions = 0 + 3; // 3 for R in service
- runJoinValidate("Mixed6", expectedExternalSubmissions, PROPS_IN_AM,
- PROPS_IN_AM, PROPS_EXT_SERVICE_PUSH);
+ runJoinValidate("Mixed6", expectedExternalSubmissions, EXECUTION_CONTEXT_IN_AM,
+ EXECUTION_CONTEXT_IN_AM, EXECUTION_CONTEXT_EXT_SERVICE_PUSH);
}
@Test(timeout = 60000)
public void testMixed7() throws Exception { // M - AM, R - Containers
int expectedExternalSubmissions = 0; // Nothing in ext service
- runJoinValidate("Mixed7", expectedExternalSubmissions, PROPS_IN_AM,
- PROPS_IN_AM, PROPS_REGULAR_CONTAINERS);
+ runJoinValidate("Mixed7", expectedExternalSubmissions, EXECUTION_CONTEXT_IN_AM,
+ EXECUTION_CONTEXT_IN_AM, EXECUTION_CONTEXT_REGULAR_CONTAINERS);
}
@Test(timeout = 60000)
@@ -303,10 +289,9 @@ public class TestExternalTezServices {
DAG dag = DAG.create(ContainerRunnerImpl.DAG_NAME_INSTRUMENTED_FAILURES);
Vertex v =Vertex.create("Vertex1", ProcessorDescriptor.create(SleepProcessor.class.getName()),
3);
- for (Map.Entry<String, String> prop : PROPS_EXT_SERVICE_PUSH.entrySet()) {
- v.setConf(prop.getKey(), prop.getValue());
- }
+ v.setExecutionContext(EXECUTION_CONTEXT_EXT_SERVICE_PUSH);
dag.addVertex(v);
+
DAGClient dagClient = sharedTezClient.submitDAG(dag);
DAGStatus dagStatus = dagClient.waitForCompletion();
assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
@@ -315,16 +300,16 @@ public class TestExternalTezServices {
}
- private void runJoinValidate(String name, int extExpectedCount, Map<String, String> lhsProps,
- Map<String, String> rhsProps,
- Map<String, String> validateProps) throws
+ private void runJoinValidate(String name, int extExpectedCount, VertexExecutionContext lhsContext,
+ VertexExecutionContext rhsContext,
+ VertexExecutionContext validateContext) throws
Exception {
int externalSubmissionCount = tezTestServiceCluster.getNumSubmissions();
TezConfiguration tezConf = new TezConfiguration(confForJobs);
JoinValidateConfigured joinValidate =
- new JoinValidateConfigured(lhsProps, rhsProps,
- validateProps, name);
+ new JoinValidateConfigured(EXECUTION_CONTEXT_DEFAULT, lhsContext, rhsContext,
+ validateContext, name);
String[] validateArgs = new String[]{"-disableSplitGrouping",
HASH_JOIN_EXPECTED_RESULT_PATH.toString(), HASH_JOIN_OUTPUT_PATH.toString(), "3"};
assertEquals(0, joinValidate.run(tezConf, validateArgs, sharedTezClient));
http://git-wip-us.apache.org/repos/asf/tez/blob/cfd625e9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
index fff39a0..353fe23 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -62,6 +62,7 @@ import org.apache.tez.common.security.TokenCache;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.utils.RelocalizationUtils;
import org.apache.tez.runtime.api.ExecutionContext;
@@ -477,7 +478,9 @@ public class TezChild {
}
// Security framework already loaded the tokens into current ugi
- TezUtilsInternal.addUserSpecifiedTezConfiguration(System.getenv(Environment.PWD.name()), defaultConf);
+ DAGProtos.ConfigurationProto confProto =
+ TezUtilsInternal.readUserSpecifiedTezConfiguration(System.getenv(Environment.PWD.name()));
+ TezUtilsInternal.addUserSpecifiedTezConfiguration(defaultConf, confProto.getConfKeyValuesList());
UserGroupInformation.setConfiguration(defaultConf);
Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
TezChild tezChild = newTezChild(defaultConf, host, port, containerIdentifier,