You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/21 03:36:39 UTC

[31/50] [abbrv] tez git commit: TEZ-2652. Cleanup the way services are specified for an AM and vertices. (sseth)

http://git-wip-us.apache.org/repos/asf/tez/blob/b0e82379/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
index 7c6a6a4..594e6d3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
@@ -17,18 +17,21 @@ package org.apache.tez.dag.app.launcher;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.net.UnknownHostException;
+import java.util.List;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.dag.api.NamedEntityDescriptor;
+import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
 import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
 import org.apache.tez.serviceplugins.api.ContainerLauncher;
 import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
 import org.apache.tez.serviceplugins.api.ContainerStopRequest;
-import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerLauncherContextImpl;
@@ -63,35 +66,35 @@ public class ContainerLauncherRouter extends AbstractService
   public ContainerLauncherRouter(Configuration conf, AppContext context,
                                  TaskAttemptListener taskAttemptListener,
                                  String workingDirectory,
-                                 String[] containerLauncherClassIdentifiers,
+                                 List<NamedEntityDescriptor> containerLauncherDescriptors,
                                  boolean isPureLocalMode) throws UnknownHostException {
     super(ContainerLauncherRouter.class.getName());
 
     this.appContext = context;
-    if (containerLauncherClassIdentifiers == null || containerLauncherClassIdentifiers.length == 0) {
+    if (containerLauncherDescriptors == null || containerLauncherDescriptors.isEmpty()) {
       if (isPureLocalMode) {
-        containerLauncherClassIdentifiers =
-            new String[]{TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT};
+        containerLauncherDescriptors = Lists.newArrayList(new NamedEntityDescriptor(
+            TezConstants.getTezUberServicePluginName(), null));
       } else {
-        containerLauncherClassIdentifiers =
-            new String[]{TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT};
+        containerLauncherDescriptors = Lists.newArrayList(new NamedEntityDescriptor(
+            TezConstants.getTezYarnServicePluginName(), null));
       }
     }
-    containerLauncherContexts = new ContainerLauncherContext[containerLauncherClassIdentifiers.length];
-    containerLaunchers = new ContainerLauncher[containerLauncherClassIdentifiers.length];
-    containerLauncherServiceWrappers = new ServicePluginLifecycleAbstractService[containerLauncherClassIdentifiers.length];
+    containerLauncherContexts = new ContainerLauncherContext[containerLauncherDescriptors.size()];
+    containerLaunchers = new ContainerLauncher[containerLauncherDescriptors.size()];
+    containerLauncherServiceWrappers = new ServicePluginLifecycleAbstractService[containerLauncherDescriptors.size()];
 
 
-    for (int i = 0; i < containerLauncherClassIdentifiers.length; i++) {
+    for (int i = 0; i < containerLauncherDescriptors.size(); i++) {
       ContainerLauncherContext containerLauncherContext = new ContainerLauncherContextImpl(context, taskAttemptListener);
       containerLauncherContexts[i] = containerLauncherContext;
-      containerLaunchers[i] = createContainerLauncher(containerLauncherClassIdentifiers[i], context,
+      containerLaunchers[i] = createContainerLauncher(containerLauncherDescriptors.get(i), context,
           containerLauncherContext, taskAttemptListener, workingDirectory, isPureLocalMode, conf);
       containerLauncherServiceWrappers[i] = new ServicePluginLifecycleAbstractService(containerLaunchers[i]);
     }
   }
 
-  private ContainerLauncher createContainerLauncher(String containerLauncherClassIdentifier,
+  private ContainerLauncher createContainerLauncher(NamedEntityDescriptor containerLauncherDescriptor,
                                                     AppContext context,
                                                     ContainerLauncherContext containerLauncherContext,
                                                     TaskAttemptListener taskAttemptListener,
@@ -99,11 +102,12 @@ public class ContainerLauncherRouter extends AbstractService
                                                     boolean isPureLocalMode,
                                                     Configuration conf) throws
       UnknownHostException {
-    if (containerLauncherClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) {
+    if (containerLauncherDescriptor.getEntityName().equals(
+        TezConstants.getTezYarnServicePluginName())) {
       LOG.info("Creating DefaultContainerLauncher");
       return new ContainerLauncherImpl(containerLauncherContext);
-    } else if (containerLauncherClassIdentifier
-        .equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) {
+    } else if (containerLauncherDescriptor.getEntityName()
+        .equals(TezConstants.getTezUberServicePluginName())) {
       LOG.info("Creating LocalContainerLauncher");
       // TODO Post TEZ-2003. LocalContainerLauncher is special cased, since it makes use of
       // extensive internals which are only available at runtime. Will likely require
@@ -111,10 +115,10 @@ public class ContainerLauncherRouter extends AbstractService
       return
           new LocalContainerLauncher(containerLauncherContext, context, taskAttemptListener, workingDirectory, isPureLocalMode);
     } else {
-      LOG.info("Creating container launcher : " + containerLauncherClassIdentifier);
+      LOG.info("Creating container launcher {}:{} ", containerLauncherDescriptor.getEntityName(), containerLauncherDescriptor.getClassName());
       Class<? extends ContainerLauncher> containerLauncherClazz =
           (Class<? extends ContainerLauncher>) ReflectionUtils.getClazz(
-              containerLauncherClassIdentifier);
+              containerLauncherDescriptor.getClassName());
       try {
         Constructor<? extends ContainerLauncher> ctor = containerLauncherClazz
             .getConstructor(ContainerLauncherContext.class);

http://git-wip-us.apache.org/repos/asf/tez/blob/b0e82379/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index d8cf080..67a088e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -22,6 +22,7 @@ import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
@@ -34,6 +35,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.tez.dag.api.NamedEntityDescriptor;
+import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
 import org.apache.tez.serviceplugins.api.TaskScheduler;
 import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
@@ -55,7 +58,6 @@ import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.TaskLocationHint;
 import org.apache.tez.dag.api.TaskLocationHint.TaskBasedLocationAffinity;
@@ -143,14 +145,14 @@ public class TaskSchedulerEventHandler extends AbstractService implements
    * @param eventHandler
    * @param containerSignatureMatcher
    * @param webUI
-   * @param schedulerClasses the list of scheduler classes / codes. Tez internal classes are represented as codes.
+   * @param schedulerDescriptors the list of scheduler descriptors. Tez internal classes will not have the class names populated.
    *                         An empty list defaults to using the YarnTaskScheduler as the only source.
    */
   @SuppressWarnings("rawtypes")
   public TaskSchedulerEventHandler(AppContext appContext,
       DAGClientServer clientService, EventHandler eventHandler, 
       ContainerSignatureMatcher containerSignatureMatcher, WebUIService webUI,
-      String [] schedulerClasses, boolean isPureLocalMode) {
+      List<NamedEntityDescriptor> schedulerDescriptors, boolean isPureLocalMode) {
     super(TaskSchedulerEventHandler.class.getName());
     this.appContext = appContext;
     this.eventHandler = eventHandler;
@@ -166,31 +168,34 @@ public class TaskSchedulerEventHandler extends AbstractService implements
 
     // Override everything for pure local mode
     if (isPureLocalMode) {
-      this.taskSchedulerClasses = new String[] {TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT};
+      this.taskSchedulerClasses = new String[] {TezConstants.getTezUberServicePluginName()};
       this.yarnTaskSchedulerIndex = -1;
     } else {
-      if (schedulerClasses == null || schedulerClasses.length ==0) {
-        this.taskSchedulerClasses = new String[] {TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT};
+      if (schedulerDescriptors == null || schedulerDescriptors.isEmpty()) {
+        this.taskSchedulerClasses = new String[] {TezConstants.getTezYarnServicePluginName()};
         this.yarnTaskSchedulerIndex = 0;
       } else {
         // Ensure the YarnScheduler will be setup and note it's index. This will be responsible for heartbeats and YARN registration.
         int foundYarnTaskSchedulerIndex = -1;
-        for (int i = 0 ; i < schedulerClasses.length ; i++) {
-          if (schedulerClasses[i].equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) {
+
+        List<String> taskSchedulerClassList = new LinkedList<>();
+        for (int i = 0 ; i < schedulerDescriptors.size() ; i++) {
+          if (schedulerDescriptors.get(i).getEntityName().equals(
+              TezConstants.getTezYarnServicePluginName())) {
+            taskSchedulerClassList.add(schedulerDescriptors.get(i).getEntityName());
             foundYarnTaskSchedulerIndex = i;
-            break;
+          } else if (schedulerDescriptors.get(i).getEntityName().equals(
+              TezConstants.getTezUberServicePluginName())) {
+            taskSchedulerClassList.add(schedulerDescriptors.get(i).getEntityName());
+          } else {
+            taskSchedulerClassList.add(schedulerDescriptors.get(i).getClassName());
           }
         }
-        if (foundYarnTaskSchedulerIndex == -1) { // Not found. Add at the end.
-          this.taskSchedulerClasses = new String[schedulerClasses.length+1];
-          foundYarnTaskSchedulerIndex = this.taskSchedulerClasses.length -1;
-          for (int i = 0 ; i < schedulerClasses.length ; i++) { // Copy over the rest.
-            this.taskSchedulerClasses[i] = schedulerClasses[i];
-          }
-          this.taskSchedulerClasses[foundYarnTaskSchedulerIndex] = TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT;
-        } else {
-          this.taskSchedulerClasses = schedulerClasses;
+        if (foundYarnTaskSchedulerIndex == -1) {
+          taskSchedulerClassList.add(YarnTaskSchedulerService.class.getName());
+          foundYarnTaskSchedulerIndex = taskSchedulerClassList.size() -1;
         }
+        this.taskSchedulerClasses = taskSchedulerClassList.toArray(new String[taskSchedulerClassList.size()]);
         this.yarnTaskSchedulerIndex = foundYarnTaskSchedulerIndex;
       }
     }
@@ -419,10 +424,10 @@ public class TaskSchedulerEventHandler extends AbstractService implements
         new TaskSchedulerContextImpl(this, appContext, schedulerId, trackingUrl,
             customAppIdIdentifier, host, port, getConfig());
     TaskSchedulerContext wrappedContext = new TaskSchedulerContextImplWrapper(rawContext, appCallbackExecutor);
-    if (schedulerClassName.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) {
+    if (schedulerClassName.equals(TezConstants.getTezYarnServicePluginName())) {
       LOG.info("Creating TaskScheduler: YarnTaskSchedulerService");
       return new YarnTaskSchedulerService(wrappedContext);
-    } else if (schedulerClassName.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) {
+    } else if (schedulerClassName.equals(TezConstants.getTezUberServicePluginName())) {
       LOG.info("Creating TaskScheduler: Local TaskScheduler");
       return new LocalTaskSchedulerService(wrappedContext);
     } else {
@@ -454,7 +459,7 @@ public class TaskSchedulerEventHandler extends AbstractService implements
     for (int i = 0; i < taskSchedulerClasses.length; i++) {
       long customAppIdIdentifier;
       if (isPureLocalMode || taskSchedulerClasses[i].equals(
-          TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) { // Use the app identifier from the appId.
+          TezConstants.getTezYarnServicePluginName())) { // Use the app identifier from the appId.
         customAppIdIdentifier = appContext.getApplicationID().getClusterTimestamp();
       } else {
         customAppIdIdentifier = SCHEDULER_APP_ID_BASE + (j++ * SCHEDULER_APP_ID_INCREMENT);

http://git-wip-us.apache.org/repos/asf/tez/blob/b0e82379/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index 21ae5f7..17feeaa 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.tez.dag.api.NamedEntityDescriptor;
 import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
 import org.apache.tez.serviceplugins.api.ContainerLauncher;
 import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
@@ -486,7 +487,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
       Credentials credentials, String jobUserName, int handlerConcurrency, int numConcurrentContainers) {
     super(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort, clock, appSubmitTime,
         isSession, workingDirectory, localDirs, logDirs,  new TezApiVersionInfo().getVersion(), 1,
-        credentials, jobUserName);
+        credentials, jobUserName, null);
     containerLauncherContext = new ContainerLauncherContextImpl(getContext(), getTaskAttemptListener());
     containerLauncher = new MockContainerLauncher(launcherGoFlag, containerLauncherContext);
     shutdownHandler = new MockDAGAppMasterShutdownHandler();
@@ -500,7 +501,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
   // use mock container launcher for tests
   @Override
   protected ContainerLauncherRouter createContainerLauncherRouter(final Configuration conf,
-                                                                  String[] containerLaunchers,
+                                                                  List<NamedEntityDescriptor> containerLauncherDescirptors,
                                                                   boolean isLocal)
       throws UnknownHostException {
     return new ContainerLauncherRouter(containerLauncher, getContext());

http://git-wip-us.apache.org/repos/asf/tez/blob/b0e82379/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
index 41a7373..e45b0a2 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
@@ -50,6 +50,7 @@ import org.apache.tez.common.ContainerTask;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.common.security.TokenCache;
+import org.apache.tez.dag.api.NamedEntityDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.serviceplugins.api.ContainerEndReason;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
@@ -375,10 +376,10 @@ public class TestTaskAttemptListenerImplTezDag {
     public TaskAttemptListenerImplForTest(AppContext context,
                                           TaskHeartbeatHandler thh,
                                           ContainerHeartbeatHandler chh,
-                                          String[] taskCommunicatorClassIdentifiers,
+                                          List<NamedEntityDescriptor> taskCommDescriptors,
                                           Configuration conf,
                                           boolean isPureLocalMode) {
-      super(context, thh, chh, taskCommunicatorClassIdentifiers, conf,
+      super(context, thh, chh, taskCommDescriptors, conf,
           isPureLocalMode);
     }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/b0e82379/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
index 3ea0446..f191175 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
@@ -32,6 +32,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -48,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.common.ContainerSignatureMatcher;
+import org.apache.tez.dag.api.NamedEntityDescriptor;
 import org.apache.tez.dag.api.TaskLocationHint;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.client.DAGClientServer;
@@ -91,7 +93,7 @@ public class TestTaskSchedulerEventHandler {
     public MockTaskSchedulerEventHandler(AppContext appContext,
         DAGClientServer clientService, EventHandler eventHandler,
         ContainerSignatureMatcher containerSignatureMatcher, WebUIService webUI) {
-      super(appContext, clientService, eventHandler, containerSignatureMatcher, webUI, new String[] {}, false);
+      super(appContext, clientService, eventHandler, containerSignatureMatcher, webUI, new LinkedList<NamedEntityDescriptor>(), false);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/b0e82379/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
index 966c95a..60d37e9 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.common.ContainerSignatureMatcher;
+import org.apache.tez.dag.api.NamedEntityDescriptor;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
 import org.apache.tez.dag.app.rm.YarnTaskSchedulerService.CookieContainerRequest;
@@ -130,7 +131,7 @@ class TestTaskSchedulerHelpers {
         EventHandler eventHandler,
         TezAMRMClientAsync<CookieContainerRequest> amrmClientAsync,
         ContainerSignatureMatcher containerSignatureMatcher) {
-      super(appContext, null, eventHandler, containerSignatureMatcher, null, new String[]{}, false);
+      super(appContext, null, eventHandler, containerSignatureMatcher, null, new LinkedList<NamedEntityDescriptor>(), false);
       this.amrmClientAsync = amrmClientAsync;
       this.containerSignatureMatcher = containerSignatureMatcher;
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/b0e82379/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java b/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
index ba17ba0..611e8cc 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
@@ -20,8 +20,8 @@ package org.apache.tez.examples;
 
 import java.io.IOException;
 import java.util.Set;
-import java.util.Map;
 
+import org.apache.tez.dag.api.Vertex.VertexExecutionContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -137,6 +137,9 @@ public class JoinValidate extends TezExampleBase {
   private DAG createDag(TezConfiguration tezConf, Path lhs, Path rhs, int numPartitions)
       throws IOException {
     DAG dag = DAG.create(getDagName());
+    if (getDefaultExecutionContext() != null) {
+      dag.setExecutionContext(getDefaultExecutionContext());
+    }
 
     // Configuration for intermediate output - shared by Vertex1 and Vertex2
     // This should only be setting selective keys from the underlying conf. Fix after there's a
@@ -153,18 +156,18 @@ public class JoinValidate extends TezExampleBase {
         MRInput
             .createConfigBuilder(new Configuration(tezConf), TextInputFormat.class,
                 lhs.toUri().toString()).groupSplits(!isDisableSplitGrouping()).build());
-    setVertexProperties(lhsVertex, getLhsVertexProperties());
+    setVertexExecutionContext(lhsVertex, getLhsExecutionContext());
 
     Vertex rhsVertex = Vertex.create(RHS_INPUT_NAME, ProcessorDescriptor.create(
         ForwardingProcessor.class.getName())).addDataSource("rhs",
         MRInput
             .createConfigBuilder(new Configuration(tezConf), TextInputFormat.class,
                 rhs.toUri().toString()).groupSplits(!isDisableSplitGrouping()).build());
-    setVertexProperties(rhsVertex, getRhsVertexProperties());
+    setVertexExecutionContext(rhsVertex, getRhsExecutionContext());
 
     Vertex joinValidateVertex = Vertex.create("joinvalidate", ProcessorDescriptor.create(
         JoinValidateProcessor.class.getName()), numPartitions);
-    setVertexProperties(joinValidateVertex, getValidateVertexProperties());
+    setVertexExecutionContext(joinValidateVertex, getValidateExecutionContext());
 
     Edge e1 = Edge.create(lhsVertex, joinValidateVertex, edgeConf.createDefaultEdgeProperty());
     Edge e2 = Edge.create(rhsVertex, joinValidateVertex, edgeConf.createDefaultEdgeProperty());
@@ -174,23 +177,25 @@ public class JoinValidate extends TezExampleBase {
     return dag;
   }
 
-  private void setVertexProperties(Vertex vertex, Map<String, String> properties) {
-    if (properties != null) {
-      for (Map.Entry<String, String> entry : properties.entrySet()) {
-        vertex.setConf(entry.getKey(), entry.getValue());
-      }
+  private void setVertexExecutionContext(Vertex vertex, VertexExecutionContext executionContext) {
+    if (executionContext != null) {
+      vertex.setExecutionContext(executionContext);
     }
   }
 
-  protected Map<String, String> getLhsVertexProperties() {
+  protected VertexExecutionContext getDefaultExecutionContext() {
     return null;
   }
 
-  protected Map<String, String> getRhsVertexProperties() {
+  protected VertexExecutionContext getLhsExecutionContext() {
     return null;
   }
 
-  protected Map<String, String> getValidateVertexProperties() {
+  protected VertexExecutionContext getRhsExecutionContext() {
+    return null;
+  }
+
+  protected VertexExecutionContext getValidateExecutionContext() {
     return null;
   }
 
@@ -240,4 +245,6 @@ public class JoinValidate extends TezExampleBase {
       }
     }
   }
+
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/b0e82379/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
index 85f9415..0002b42 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
@@ -121,7 +121,8 @@ public class TezTestServiceContainerLauncher extends ContainerLauncher {
   private RunContainerRequestProto constructRunContainerRequest(ContainerLaunchRequest launchRequest) throws
       IOException {
     RunContainerRequestProto.Builder builder = RunContainerRequestProto.newBuilder();
-    Preconditions.checkArgument(launchRequest.getTaskCommunicatorName().equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT));
+    Preconditions.checkArgument(launchRequest.getTaskCommunicatorName().equals(
+        TezConstants.getTezYarnServicePluginName()));
     InetSocketAddress address = (InetSocketAddress) getContext().getTaskCommunicatorMetaInfo(launchRequest.getTaskCommunicatorName());
     builder.setAmHost(address.getHostName()).setAmPort(address.getPort());
     builder.setAppAttemptNumber(appAttemptId.getAttemptId());

http://git-wip-us.apache.org/repos/asf/tez/blob/b0e82379/tez-ext-service-tests/src/test/java/org/apache/tez/examples/JoinValidateConfigured.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/examples/JoinValidateConfigured.java b/tez-ext-service-tests/src/test/java/org/apache/tez/examples/JoinValidateConfigured.java
index e5d2e3b..f31476f 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/examples/JoinValidateConfigured.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/examples/JoinValidateConfigured.java
@@ -14,36 +14,46 @@
 
 package org.apache.tez.examples;
 
-import java.util.Map;
+
+import org.apache.tez.dag.api.Vertex.VertexExecutionContext;
 
 public class JoinValidateConfigured extends JoinValidate {
 
-  private final Map<String, String> lhsProps;
-  private final Map<String, String> rhsProps;
-  private final Map<String, String> validateProps;
+  private final VertexExecutionContext defaultExecutionContext;
+  private final VertexExecutionContext lhsContext;
+  private final VertexExecutionContext rhsContext;
+  private final VertexExecutionContext validateContext;
   private final String dagNameSuffix;
 
-  public JoinValidateConfigured(Map<String, String> lhsProps, Map<String, String> rhsProps,
-                                Map<String, String> validateProps, String dagNameSuffix) {
-    this.lhsProps = lhsProps;
-    this.rhsProps = rhsProps;
-    this.validateProps = validateProps;
+  public JoinValidateConfigured(VertexExecutionContext defaultExecutionContext,
+                                VertexExecutionContext lhsContext,
+                                VertexExecutionContext rhsContext,
+                                VertexExecutionContext validateContext, String dagNameSuffix) {
+    this.defaultExecutionContext = defaultExecutionContext;
+    this.lhsContext = lhsContext;
+    this.rhsContext = rhsContext;
+    this.validateContext = validateContext;
     this.dagNameSuffix = dagNameSuffix;
   }
 
   @Override
-  protected Map<String, String> getLhsVertexProperties() {
-    return this.lhsProps;
+  protected VertexExecutionContext getDefaultExecutionContext() {
+    return this.defaultExecutionContext;
+  }
+
+  @Override
+  protected VertexExecutionContext getLhsExecutionContext() {
+    return this.lhsContext;
   }
 
   @Override
-  protected Map<String, String> getRhsVertexProperties() {
-    return this.rhsProps;
+  protected VertexExecutionContext getRhsExecutionContext() {
+    return this.rhsContext;
   }
 
   @Override
-  protected Map<String, String> getValidateVertexProperties() {
-    return this.validateProps;
+  protected VertexExecutionContext getValidateExecutionContext() {
+    return this.validateContext;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/b0e82379/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
index 45c70f1..07dd363 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
@@ -19,7 +19,6 @@ import static org.junit.Assert.assertEquals;
 import java.io.IOException;
 import java.util.Map;
 
-import com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -28,9 +27,9 @@ import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.Vertex.VertexExecutionContext;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.app.launcher.TezTestServiceNoOpContainerLauncher;
@@ -43,6 +42,10 @@ import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.processor.SleepProcessor;
 import org.apache.tez.service.MiniTezTestServiceCluster;
 import org.apache.tez.service.impl.ContainerRunnerImpl;
+import org.apache.tez.serviceplugins.api.ContainerLauncherDescriptor;
+import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
+import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor;
+import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor;
 import org.apache.tez.test.MiniTezCluster;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -72,9 +75,15 @@ public class TestExternalTezServices {
   private static final Path HASH_JOIN_EXPECTED_RESULT_PATH = new Path(SRC_DATA_DIR, "expectedOutputPath");
   private static final Path HASH_JOIN_OUTPUT_PATH = new Path(SRC_DATA_DIR, "outPath");
 
-  private static final Map<String, String> PROPS_EXT_SERVICE_PUSH = Maps.newHashMap();
-  private static final Map<String, String> PROPS_REGULAR_CONTAINERS = Maps.newHashMap();
-  private static final Map<String, String> PROPS_IN_AM = Maps.newHashMap();
+  private static final VertexExecutionContext EXECUTION_CONTEXT_EXT_SERVICE_PUSH =
+      VertexExecutionContext.create(
+          EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME);
+  private static final VertexExecutionContext EXECUTION_CONTEXT_REGULAR_CONTAINERS =
+      VertexExecutionContext.createExecuteInContainers(true);
+  private static final VertexExecutionContext EXECUTION_CONTEXT_IN_AM =
+      VertexExecutionContext.createExecuteInAm(true);
+
+  private static final VertexExecutionContext EXECUTION_CONTEXT_DEFAULT = EXECUTION_CONTEXT_EXT_SERVICE_PUSH;
 
   private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR + TestExternalTezServices.class.getName()
       + "-tmpDir";
@@ -127,51 +136,28 @@ public class TestExternalTezServices {
     confForJobs.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString());
     confForJobs.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, false);
 
-    confForJobs.setStrings(TezConfiguration.TEZ_AM_TASK_SCHEDULERS,
-        TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT,
-        TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT,
-        EXT_PUSH_ENTITY_NAME + ":" + TezTestServiceTaskSchedulerService.class.getName());
-
-    confForJobs.setStrings(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHERS,
-        TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT,
-        TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT,
-        EXT_PUSH_ENTITY_NAME + ":" + TezTestServiceNoOpContainerLauncher.class.getName());
-
-    confForJobs.setStrings(TezConfiguration.TEZ_AM_TASK_COMMUNICATORS,
-        TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT,
-        TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT,
-        EXT_PUSH_ENTITY_NAME + ":" + TezTestServiceTaskCommunicatorImpl.class.getName());
-
-    // Default all jobs to run via the service. Individual tests override this on a per vertex/dag level.
-    confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, EXT_PUSH_ENTITY_NAME);
-    confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, EXT_PUSH_ENTITY_NAME);
-    confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME, EXT_PUSH_ENTITY_NAME);
-
-    // Setup various executor sets
-    PROPS_REGULAR_CONTAINERS.put(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME,
-        TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT);
-    PROPS_REGULAR_CONTAINERS.put(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME,
-        TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT);
-    PROPS_REGULAR_CONTAINERS.put(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME,
-        TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT);
-
-    PROPS_EXT_SERVICE_PUSH.put(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, EXT_PUSH_ENTITY_NAME);
-    PROPS_EXT_SERVICE_PUSH.put(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, EXT_PUSH_ENTITY_NAME);
-    PROPS_EXT_SERVICE_PUSH.put(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME, EXT_PUSH_ENTITY_NAME);
-
-    PROPS_IN_AM.put(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME,
-        TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT);
-    PROPS_IN_AM.put(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME,
-        TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT);
-    PROPS_IN_AM.put(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME,
-        TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT);
+    TaskSchedulerDescriptor[] taskSchedulerDescriptors = new TaskSchedulerDescriptor[]{
+        TaskSchedulerDescriptor
+            .create(EXT_PUSH_ENTITY_NAME, TezTestServiceTaskSchedulerService.class.getName())};
+
+    ContainerLauncherDescriptor[] containerLauncherDescriptors = new ContainerLauncherDescriptor[]{
+        ContainerLauncherDescriptor
+            .create(EXT_PUSH_ENTITY_NAME, TezTestServiceNoOpContainerLauncher.class.getName())};
+
+    TaskCommunicatorDescriptor[] taskCommunicatorDescriptors = new TaskCommunicatorDescriptor[]{
+        TaskCommunicatorDescriptor
+            .create(EXT_PUSH_ENTITY_NAME, TezTestServiceTaskCommunicatorImpl.class.getName())};
 
+    ServicePluginsDescriptor servicePluginsDescriptor = ServicePluginsDescriptor.create(true, true,
+        taskSchedulerDescriptors, containerLauncherDescriptors, taskCommunicatorDescriptors);
 
     // Create a session to use for all tests.
     TezConfiguration tezClientConf = new TezConfiguration(confForJobs);
 
-    sharedTezClient = TezClient.create(TestExternalTezServices.class.getSimpleName() + "_session",
-        tezClientConf, true);
+    sharedTezClient = TezClient
+        .newBuilder(TestExternalTezServices.class.getSimpleName() + "_session", tezClientConf)
+        .setIsSession(true).setServicePluginDescriptor(servicePluginsDescriptor).build();
+
     sharedTezClient.start();
     LOG.info("Shared TezSession started");
     sharedTezClient.waitTillReady();
@@ -225,71 +211,71 @@ public class TestExternalTezServices {
   @Test(timeout = 60000)
   public void testAllInService() throws Exception {
     int expectedExternalSubmissions = 4 + 3; //4 for 4 src files, 3 for num reducers.
-    runJoinValidate("AllInService", expectedExternalSubmissions, PROPS_EXT_SERVICE_PUSH,
-        PROPS_EXT_SERVICE_PUSH, PROPS_EXT_SERVICE_PUSH);
+    runJoinValidate("AllInService", expectedExternalSubmissions, EXECUTION_CONTEXT_EXT_SERVICE_PUSH,
+        EXECUTION_CONTEXT_EXT_SERVICE_PUSH, EXECUTION_CONTEXT_EXT_SERVICE_PUSH);
   }
 
   @Test(timeout = 60000)
   public void testAllInContainers() throws Exception {
     int expectedExternalSubmissions = 0; // All in containers
-    runJoinValidate("AllInContainers", expectedExternalSubmissions, PROPS_REGULAR_CONTAINERS,
-        PROPS_REGULAR_CONTAINERS, PROPS_REGULAR_CONTAINERS);
+    runJoinValidate("AllInContainers", expectedExternalSubmissions, EXECUTION_CONTEXT_REGULAR_CONTAINERS,
+        EXECUTION_CONTEXT_REGULAR_CONTAINERS, EXECUTION_CONTEXT_REGULAR_CONTAINERS);
   }
 
   @Test(timeout = 60000)
   public void testAllInAM() throws Exception {
     int expectedExternalSubmissions = 0; // All in AM
-    runJoinValidate("AllInAM", expectedExternalSubmissions, PROPS_IN_AM,
-        PROPS_IN_AM, PROPS_IN_AM);
+    runJoinValidate("AllInAM", expectedExternalSubmissions, EXECUTION_CONTEXT_IN_AM,
+        EXECUTION_CONTEXT_IN_AM, EXECUTION_CONTEXT_IN_AM);
   }
 
   @Test(timeout = 60000)
   public void testMixed1() throws Exception { // M-ExtService, R-containers
     int expectedExternalSubmissions = 4 + 0; //4 for 4 src files, 0 for num reducers.
-    runJoinValidate("Mixed1", expectedExternalSubmissions, PROPS_EXT_SERVICE_PUSH,
-        PROPS_EXT_SERVICE_PUSH, PROPS_REGULAR_CONTAINERS);
+    runJoinValidate("Mixed1", expectedExternalSubmissions, EXECUTION_CONTEXT_EXT_SERVICE_PUSH,
+        EXECUTION_CONTEXT_EXT_SERVICE_PUSH, EXECUTION_CONTEXT_REGULAR_CONTAINERS);
   }
 
   @Test(timeout = 60000)
   public void testMixed2() throws Exception { // M-Containers, R-ExtService
     int expectedExternalSubmissions = 0 + 3; // 3 for num reducers.
-    runJoinValidate("Mixed2", expectedExternalSubmissions, PROPS_REGULAR_CONTAINERS,
-        PROPS_REGULAR_CONTAINERS, PROPS_EXT_SERVICE_PUSH);
+    runJoinValidate("Mixed2", expectedExternalSubmissions, EXECUTION_CONTEXT_REGULAR_CONTAINERS,
+        EXECUTION_CONTEXT_REGULAR_CONTAINERS, EXECUTION_CONTEXT_EXT_SERVICE_PUSH);
   }
 
   @Test(timeout = 60000)
   public void testMixed3() throws Exception { // M - service, R-AM
     int expectedExternalSubmissions = 4 + 0; //4 for 4 src files, 0 for num reducers (in-AM).
-    runJoinValidate("Mixed3", expectedExternalSubmissions, PROPS_EXT_SERVICE_PUSH,
-        PROPS_EXT_SERVICE_PUSH, PROPS_IN_AM);
+    runJoinValidate("Mixed3", expectedExternalSubmissions, EXECUTION_CONTEXT_EXT_SERVICE_PUSH,
+        EXECUTION_CONTEXT_EXT_SERVICE_PUSH, EXECUTION_CONTEXT_IN_AM);
   }
 
   @Test(timeout = 60000)
   public void testMixed4() throws Exception { // M - containers, R-AM
     int expectedExternalSubmissions = 0 + 0; // Nothing in external service.
-    runJoinValidate("Mixed4", expectedExternalSubmissions, PROPS_REGULAR_CONTAINERS,
-        PROPS_REGULAR_CONTAINERS, PROPS_IN_AM);
+    runJoinValidate("Mixed4", expectedExternalSubmissions, EXECUTION_CONTEXT_REGULAR_CONTAINERS,
+        EXECUTION_CONTEXT_REGULAR_CONTAINERS, EXECUTION_CONTEXT_IN_AM);
   }
 
   @Test(timeout = 60000)
   public void testMixed5() throws Exception { // M1 - containers, M2-extservice, R-AM
     int expectedExternalSubmissions = 2 + 0; // 2 for M2
-    runJoinValidate("Mixed5", expectedExternalSubmissions, PROPS_REGULAR_CONTAINERS,
-        PROPS_EXT_SERVICE_PUSH, PROPS_IN_AM);
+    runJoinValidate("Mixed5", expectedExternalSubmissions, EXECUTION_CONTEXT_REGULAR_CONTAINERS,
+        EXECUTION_CONTEXT_EXT_SERVICE_PUSH, EXECUTION_CONTEXT_IN_AM);
   }
 
   @Test(timeout = 60000)
   public void testMixed6() throws Exception { // M - AM, R - Service
     int expectedExternalSubmissions = 0 + 3; // 3 for R in service
-    runJoinValidate("Mixed6", expectedExternalSubmissions, PROPS_IN_AM,
-        PROPS_IN_AM, PROPS_EXT_SERVICE_PUSH);
+    runJoinValidate("Mixed6", expectedExternalSubmissions, EXECUTION_CONTEXT_IN_AM,
+        EXECUTION_CONTEXT_IN_AM, EXECUTION_CONTEXT_EXT_SERVICE_PUSH);
   }
 
   @Test(timeout = 60000)
   public void testMixed7() throws Exception { // M - AM, R - Containers
     int expectedExternalSubmissions = 0; // Nothing in ext service
-    runJoinValidate("Mixed7", expectedExternalSubmissions, PROPS_IN_AM,
-        PROPS_IN_AM, PROPS_REGULAR_CONTAINERS);
+    runJoinValidate("Mixed7", expectedExternalSubmissions, EXECUTION_CONTEXT_IN_AM,
+        EXECUTION_CONTEXT_IN_AM, EXECUTION_CONTEXT_REGULAR_CONTAINERS);
   }
 
   @Test(timeout = 60000)
@@ -303,10 +289,9 @@ public class TestExternalTezServices {
     DAG dag = DAG.create(ContainerRunnerImpl.DAG_NAME_INSTRUMENTED_FAILURES);
     Vertex v =Vertex.create("Vertex1", ProcessorDescriptor.create(SleepProcessor.class.getName()),
         3);
-    for (Map.Entry<String, String> prop : PROPS_EXT_SERVICE_PUSH.entrySet()) {
-      v.setConf(prop.getKey(), prop.getValue());
-    }
+    v.setExecutionContext(EXECUTION_CONTEXT_EXT_SERVICE_PUSH);
     dag.addVertex(v);
+
     DAGClient dagClient = sharedTezClient.submitDAG(dag);
     DAGStatus dagStatus = dagClient.waitForCompletion();
     assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
@@ -315,16 +300,16 @@ public class TestExternalTezServices {
 
   }
 
-  private void runJoinValidate(String name, int extExpectedCount, Map<String, String> lhsProps,
-                               Map<String, String> rhsProps,
-                               Map<String, String> validateProps) throws
+  private void runJoinValidate(String name, int extExpectedCount, VertexExecutionContext lhsContext,
+                               VertexExecutionContext rhsContext,
+                               VertexExecutionContext validateContext) throws
       Exception {
     int externalSubmissionCount = tezTestServiceCluster.getNumSubmissions();
 
     TezConfiguration tezConf = new TezConfiguration(confForJobs);
     JoinValidateConfigured joinValidate =
-        new JoinValidateConfigured(lhsProps, rhsProps,
-            validateProps, name);
+        new JoinValidateConfigured(EXECUTION_CONTEXT_DEFAULT, lhsContext, rhsContext,
+            validateContext, name);
     String[] validateArgs = new String[]{"-disableSplitGrouping",
         HASH_JOIN_EXPECTED_RESULT_PATH.toString(), HASH_JOIN_OUTPUT_PATH.toString(), "3"};
     assertEquals(0, joinValidate.run(tezConf, validateArgs, sharedTezClient));

http://git-wip-us.apache.org/repos/asf/tez/blob/b0e82379/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
index fff39a0..353fe23 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -62,6 +62,7 @@ import org.apache.tez.common.security.TokenCache;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.records.DAGProtos;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.utils.RelocalizationUtils;
 import org.apache.tez.runtime.api.ExecutionContext;
@@ -477,7 +478,9 @@ public class TezChild {
     }
 
     // Security framework already loaded the tokens into current ugi
-    TezUtilsInternal.addUserSpecifiedTezConfiguration(System.getenv(Environment.PWD.name()), defaultConf);
+    DAGProtos.ConfigurationProto confProto =
+        TezUtilsInternal.readUserSpecifiedTezConfiguration(System.getenv(Environment.PWD.name()));
+    TezUtilsInternal.addUserSpecifiedTezConfiguration(defaultConf, confProto.getConfKeyValuesList());
     UserGroupInformation.setConfiguration(defaultConf);
     Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
     TezChild tezChild = newTezChild(defaultConf, host, port, containerIdentifier,