You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/22 03:19:19 UTC

[29/50] [abbrv] tez git commit: TEZ-2653. Change service contexts to expose a user specified payload instead of the AM configuration. (sseth)

TEZ-2653. Change service contexts to expose a user specified payload
instead of the AM configuration. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4a18c5d5
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4a18c5d5
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4a18c5d5

Branch: refs/heads/TEZ-2003
Commit: 4a18c5d5baaaab4565da9d3c3085d98dfc91d07e
Parents: ec5acd8
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue Jul 28 14:56:56 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Aug 21 18:14:40 2015 -0700

----------------------------------------------------------------------
 TEZ-2003-CHANGES.txt                            |  1 +
 .../tez/dag/api/NamedEntityDescriptor.java      |  7 ++-
 .../api/ContainerLauncherContext.java           |  5 +-
 .../api/TaskSchedulerContext.java               |  5 +-
 .../tez/dag/api/TaskCommunicatorContext.java    |  4 +-
 .../dag/app/ContainerLauncherContextImpl.java   | 10 ++--
 .../org/apache/tez/dag/app/DAGAppMaster.java    | 20 +++++--
 .../dag/app/TaskAttemptListenerImpTezDag.java   | 19 ++++--
 .../dag/app/TaskCommunicatorContextImpl.java    | 12 ++--
 .../tez/dag/app/TezTaskCommunicatorImpl.java    | 12 +++-
 .../dag/app/launcher/ContainerLauncherImpl.java |  8 ++-
 .../app/launcher/ContainerLauncherRouter.java   | 24 +++++---
 .../app/launcher/LocalContainerLauncher.java    | 10 +++-
 .../dag/app/rm/LocalTaskSchedulerService.java   | 10 +++-
 .../dag/app/rm/TaskSchedulerContextImpl.java    | 12 ++--
 .../app/rm/TaskSchedulerContextImplWrapper.java |  6 +-
 .../dag/app/rm/TaskSchedulerEventHandler.java   | 61 ++++++++++++--------
 .../dag/app/rm/YarnTaskSchedulerService.java    | 21 +++++--
 .../apache/tez/dag/app/MockDAGAppMaster.java    | 15 ++++-
 .../app/TestTaskAttemptListenerImplTezDag.java  | 32 ++++++++--
 .../app/TestTaskAttemptListenerImplTezDag2.java | 12 +++-
 .../tez/dag/app/rm/TestContainerReuse.java      | 19 +++---
 .../tez/dag/app/rm/TestTaskScheduler.java       |  3 +-
 .../app/rm/TestTaskSchedulerEventHandler.java   | 18 +++++-
 .../dag/app/rm/TestTaskSchedulerHelpers.java    | 26 +++++++--
 .../TezTestServiceContainerLauncher.java        | 15 +++--
 .../rm/TezTestServiceTaskSchedulerService.java  | 10 +++-
 .../TezTestServiceTaskCommunicatorImpl.java     |  2 +-
 .../tez/tests/TestExternalTezServices.java      | 13 ++++-
 29 files changed, 294 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index a201942..b88044b 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -38,5 +38,6 @@ ALL CHANGES:
   TEZ-2005. Define basic interface for pluggable TaskScheduler.
   TEZ-2651. Pluggable services should not extend AbstractService.
   TEZ-2652. Cleanup the way services are specified for an AM and vertices.
+  TEZ-2653. Change service contexts to expose a user specified payload instead of the AM configuration.
 
 INCOMPATIBLE CHANGES:

http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-api/src/main/java/org/apache/tez/dag/api/NamedEntityDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/NamedEntityDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/NamedEntityDescriptor.java
index bad0d10..723d43f 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/NamedEntityDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/NamedEntityDescriptor.java
@@ -17,7 +17,7 @@ package org.apache.tez.dag.api;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience;
 
-public class NamedEntityDescriptor<T extends EntityDescriptor<T>> extends EntityDescriptor<T> {
+public class NamedEntityDescriptor<T extends NamedEntityDescriptor<T>> extends EntityDescriptor<NamedEntityDescriptor<T>>  {
   private final String entityName;
 
   @InterfaceAudience.Private
@@ -30,4 +30,9 @@ public class NamedEntityDescriptor<T extends EntityDescriptor<T>> extends Entity
   public String getEntityName() {
     return entityName;
   }
+
+  public T setUserPayload(UserPayload userPayload) {
+    super.setUserPayload(userPayload);
+    return (T) this;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java
index 836dc4a..5da38b8 100644
--- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java
@@ -16,9 +16,9 @@ package org.apache.tez.serviceplugins.api;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.dag.api.UserPayload;
 
 @InterfaceAudience.Public
 @InterfaceStability.Unstable
@@ -43,8 +43,7 @@ public interface ContainerLauncherContext {
 
   // Lookup APIs
 
-  // TODO TEZ-2003. To be replaced by getInitialPayload once the DAG API is changed.
-  Configuration getInitialConfiguration();
+  UserPayload getInitialUserPayload();
 
   int getNumNodes(String sourceName);
 

http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java
index b2c8799..6f37641 100644
--- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java
@@ -20,7 +20,6 @@ import java.util.Map;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -30,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.tez.common.ContainerSignatureMatcher;
+import org.apache.tez.dag.api.UserPayload;
 
 @InterfaceAudience.Public
 @InterfaceStability.Unstable
@@ -84,8 +84,7 @@ public interface TaskSchedulerContext {
 
   // Getters
 
-  // TODO TEZ-2003. To be replaced by getInitialPayload
-  public Configuration getInitialConfiguration();
+  public UserPayload getInitialUserPayload();
 
   public String getAppTrackingUrl();
 

http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
index ab32ec1..a1e94a3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
@@ -18,7 +18,6 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.Set;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -36,8 +35,7 @@ public interface TaskCommunicatorContext {
 
   // TODO TEZ-2003 Maybe add book-keeping as a helper library, instead of each impl tracking container to task etc.
 
-  // TODO TEZ-2003 To be replaced by getInitialPayload
-  Configuration getInitialConfiguration();
+  UserPayload getInitialUserPayload();
 
   ApplicationAttemptId getApplicationAttemptId();
   Credentials getCredentials();

http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java
index 997775a..92bbbdc 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java
@@ -14,10 +14,10 @@
 
 package org.apache.tez.dag.app;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.apache.tez.dag.app.rm.container.AMContainerEvent;
@@ -33,10 +33,12 @@ public class ContainerLauncherContextImpl implements ContainerLauncherContext {
 
   private final AppContext context;
   private final TaskAttemptListener tal;
+  private final UserPayload initialUserPayload;
 
-  public ContainerLauncherContextImpl(AppContext appContext, TaskAttemptListener tal) {
+  public ContainerLauncherContextImpl(AppContext appContext, TaskAttemptListener tal, UserPayload initialUserPayload) {
     this.context = appContext;
     this.tal = tal;
+    this.initialUserPayload = initialUserPayload;
   }
 
   @Override
@@ -76,8 +78,8 @@ public class ContainerLauncherContextImpl implements ContainerLauncherContext {
   }
 
   @Override
-  public Configuration getInitialConfiguration() {
-    return context.getAMConf();
+  public UserPayload getInitialUserPayload() {
+    return initialUserPayload;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 4f75891..52621bd 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -62,8 +62,10 @@ import com.google.common.collect.HashBiMap;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.Options;
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.NamedEntityDescriptor;
 import org.apache.tez.dag.api.SessionNotRunning;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
 import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
 import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto;
@@ -492,9 +494,12 @@ public class DAGAppMaster extends AbstractService {
     jobTokenSecretManager.addTokenForJob(
         appAttemptID.getApplicationId().toString(), sessionToken);
 
+    UserPayload defaultPayload = TezUtils.createUserPayloadFromConf(amConf);
+
     //service to handle requests to TaskUmbilicalProtocol
     taskAttemptListener = createTaskAttemptListener(context,
-        taskHeartbeatHandler, containerHeartbeatHandler, taskCommunicatorDescriptors, isLocal);
+        taskHeartbeatHandler, containerHeartbeatHandler, taskCommunicatorDescriptors,
+        defaultPayload, isLocal);
     addIfService(taskAttemptListener, true);
 
     containerSignatureMatcher = createContainerSignatureMatcher();
@@ -540,9 +545,11 @@ public class DAGAppMaster extends AbstractService {
       }
     }
 
+
+
     this.taskSchedulerEventHandler = new TaskSchedulerEventHandler(context,
         clientRpcServer, dispatcher.getEventHandler(), containerSignatureMatcher, webUIService,
-        taskSchedulerDescriptors, isLocal);
+        taskSchedulerDescriptors, defaultPayload, isLocal);
     addIfService(taskSchedulerEventHandler, true);
 
     if (enableWebUIService()) {
@@ -560,7 +567,7 @@ public class DAGAppMaster extends AbstractService {
         taskSchedulerEventHandler);
     addIfServiceDependency(taskSchedulerEventHandler, clientRpcServer);
 
-    this.containerLauncherRouter = createContainerLauncherRouter(conf, containerLauncherDescriptors, isLocal);
+    this.containerLauncherRouter = createContainerLauncherRouter(defaultPayload, containerLauncherDescriptors, isLocal);
     addIfService(containerLauncherRouter, true);
     dispatcher.register(NMCommunicatorEventType.class, containerLauncherRouter);
 
@@ -1071,10 +1078,11 @@ public class DAGAppMaster extends AbstractService {
                                                           TaskHeartbeatHandler thh,
                                                           ContainerHeartbeatHandler chh,
                                                           List<NamedEntityDescriptor> entityDescriptors,
+                                                          UserPayload defaultUserPayload,
                                                           boolean isLocal) {
     TaskAttemptListener lis =
         new TaskAttemptListenerImpTezDag(context, thh, chh,
-            entityDescriptors, amConf, isLocal);
+            entityDescriptors, defaultUserPayload, isLocal);
     return lis;
   }
 
@@ -1095,11 +1103,11 @@ public class DAGAppMaster extends AbstractService {
     return chh;
   }
 
-  protected ContainerLauncherRouter createContainerLauncherRouter(Configuration conf,
+  protected ContainerLauncherRouter createContainerLauncherRouter(UserPayload defaultPayload,
                                                                   List<NamedEntityDescriptor> containerLauncherDescriptors,
                                                                   boolean isLocal) throws
       UnknownHostException {
-    return new ContainerLauncherRouter(conf, context, taskAttemptListener, workingDirectory,
+    return new ContainerLauncherRouter(defaultPayload, context, taskAttemptListener, workingDirectory,
         containerLauncherDescriptors, isLocal);
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index 1e34184..cc109a6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -29,9 +29,9 @@ import java.util.concurrent.ConcurrentMap;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import org.apache.commons.collections4.ListUtils;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.dag.api.NamedEntityDescriptor;
 import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.serviceplugins.api.ContainerEndReason;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
@@ -103,7 +103,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
   public TaskAttemptListenerImpTezDag(AppContext context,
                                       TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh,
                                       List<NamedEntityDescriptor> taskCommunicatorDescriptors,
-                                      Configuration conf,
+                                      UserPayload defaultUserPayload,
                                       boolean isPureLocalMode) {
     super(TaskAttemptListenerImpTezDag.class.getName());
     this.context = context;
@@ -112,17 +112,26 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     if (taskCommunicatorDescriptors == null || taskCommunicatorDescriptors.isEmpty()) {
       if (isPureLocalMode) {
         taskCommunicatorDescriptors = Lists.newArrayList(new NamedEntityDescriptor(
-            TezConstants.getTezUberServicePluginName(), null));
+            TezConstants.getTezUberServicePluginName(), null).setUserPayload(defaultUserPayload));
       } else {
         taskCommunicatorDescriptors = Lists.newArrayList(new NamedEntityDescriptor(
-            TezConstants.getTezYarnServicePluginName(), null));
+            TezConstants.getTezYarnServicePluginName(), null).setUserPayload(defaultUserPayload));
       }
     }
     this.taskCommunicators = new TaskCommunicator[taskCommunicatorDescriptors.size()];
     this.taskCommunicatorContexts = new TaskCommunicatorContext[taskCommunicatorDescriptors.size()];
     this.taskCommunicatorServiceWrappers = new ServicePluginLifecycleAbstractService[taskCommunicatorDescriptors.size()];
     for (int i = 0 ; i < taskCommunicatorDescriptors.size() ; i++) {
-      taskCommunicatorContexts[i] = new TaskCommunicatorContextImpl(context, this, conf, i);
+      UserPayload userPayload;
+      if (taskCommunicatorDescriptors.get(i).getEntityName()
+          .equals(TezConstants.getTezYarnServicePluginName()) ||
+          taskCommunicatorDescriptors.get(i).getEntityName()
+              .equals(TezConstants.getTezUberServicePluginName())) {
+        userPayload = defaultUserPayload;
+      } else {
+        userPayload = taskCommunicatorDescriptors.get(i).getUserPayload();
+      }
+      taskCommunicatorContexts[i] = new TaskCommunicatorContextImpl(context, this, userPayload, i);
       taskCommunicators[i] = createTaskCommunicator(taskCommunicatorDescriptors.get(i), i);
       taskCommunicatorServiceWrappers[i] = new ServicePluginLifecycleAbstractService(taskCommunicators[i]);
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
index 035db93..cc315b7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
@@ -23,10 +23,10 @@ import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.apache.tez.dag.api.TaskCommunicatorContext;
 import org.apache.tez.dag.api.TaskHeartbeatRequest;
@@ -49,17 +49,17 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver
   private final int taskCommunicatorIndex;
   private final ReentrantReadWriteLock.ReadLock dagChangedReadLock;
   private final ReentrantReadWriteLock.WriteLock dagChangedWriteLock;
-  private final Configuration conf;
+  private final UserPayload userPayload;
 
   private DAG dag;
 
   public TaskCommunicatorContextImpl(AppContext appContext,
                                      TaskAttemptListenerImpTezDag taskAttemptListener,
-                                     Configuration conf,
+                                     UserPayload userPayload,
                                      int taskCommunicatorIndex) {
     this.context = appContext;
     this.taskAttemptListener = taskAttemptListener;
-    this.conf = conf;
+    this.userPayload = userPayload;
     this.taskCommunicatorIndex = taskCommunicatorIndex;
 
     ReentrantReadWriteLock dagChangedLock = new ReentrantReadWriteLock();
@@ -68,8 +68,8 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver
   }
 
   @Override
-  public Configuration getInitialConfiguration() {
-    return conf;
+  public UserPayload getInitialUserPayload() {
+    return userPayload;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
index 93b5b43..2a5c80e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
@@ -77,6 +77,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
 
   protected final String tokenIdentifier;
   protected final Token<JobTokenIdentifier> sessionToken;
+  protected final Configuration conf;
   protected InetSocketAddress address;
 
   protected volatile Server server;
@@ -119,6 +120,12 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
     this.taskUmbilical = new TezTaskUmbilicalProtocolImpl();
     this.tokenIdentifier = taskCommunicatorContext.getApplicationAttemptId().getApplicationId().toString();
     this.sessionToken = TokenCache.getSessionToken(taskCommunicatorContext.getCredentials());
+    try {
+      conf = TezUtils.createConfFromUserPayload(getContext().getInitialUserPayload());
+    } catch (IOException e) {
+      throw new TezUncheckedException(
+          "Unable to parse user payload for " + TezTaskCommunicatorImpl.class.getSimpleName(), e);
+    }
   }
 
   @Override
@@ -132,7 +139,6 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
   }
 
   protected void startRpcServer() {
-    Configuration conf = getContext().getInitialConfiguration();
     try {
       JobTokenSecretManager jobTokenSecretManager =
           new JobTokenSecretManager();
@@ -171,6 +177,10 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
     }
   }
 
+  protected Configuration getConf() {
+    return this.conf;
+  }
+
   private void refreshServiceAcls(Configuration configuration,
                                   PolicyProvider policyProvider) {
     this.server.refreshServiceAcl(configuration, policyProvider);

http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
index cba5c80..07d269d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
 import org.apache.tez.serviceplugins.api.ContainerLauncher;
@@ -224,7 +225,12 @@ public class ContainerLauncherImpl extends ContainerLauncher {
 
   public ContainerLauncherImpl(ContainerLauncherContext containerLauncherContext) {
     super(containerLauncherContext);
-    this.conf = new Configuration(containerLauncherContext.getInitialConfiguration());
+    try {
+      this.conf = TezUtils.createConfFromUserPayload(containerLauncherContext.getInitialUserPayload());
+    } catch (IOException e) {
+      throw new TezUncheckedException(
+          "Failed to parse user payload for " + ContainerLauncherImpl.class.getSimpleName(), e);
+    }
     conf.setInt(
         CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
         0);

http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
index 594e6d3..2d56bfe 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.dag.api.NamedEntityDescriptor;
 import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
 import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
 import org.apache.tez.serviceplugins.api.ContainerLauncher;
@@ -63,7 +64,7 @@ public class ContainerLauncherRouter extends AbstractService
   }
 
   // Accepting conf to setup final parameters, if required.
-  public ContainerLauncherRouter(Configuration conf, AppContext context,
+  public ContainerLauncherRouter(UserPayload defaultUserPayload, AppContext context,
                                  TaskAttemptListener taskAttemptListener,
                                  String workingDirectory,
                                  List<NamedEntityDescriptor> containerLauncherDescriptors,
@@ -74,10 +75,10 @@ public class ContainerLauncherRouter extends AbstractService
     if (containerLauncherDescriptors == null || containerLauncherDescriptors.isEmpty()) {
       if (isPureLocalMode) {
         containerLauncherDescriptors = Lists.newArrayList(new NamedEntityDescriptor(
-            TezConstants.getTezUberServicePluginName(), null));
+            TezConstants.getTezUberServicePluginName(), null).setUserPayload(defaultUserPayload));
       } else {
         containerLauncherDescriptors = Lists.newArrayList(new NamedEntityDescriptor(
-            TezConstants.getTezYarnServicePluginName(), null));
+            TezConstants.getTezYarnServicePluginName(), null).setUserPayload(defaultUserPayload));
       }
     }
     containerLauncherContexts = new ContainerLauncherContext[containerLauncherDescriptors.size()];
@@ -86,10 +87,20 @@ public class ContainerLauncherRouter extends AbstractService
 
 
     for (int i = 0; i < containerLauncherDescriptors.size(); i++) {
-      ContainerLauncherContext containerLauncherContext = new ContainerLauncherContextImpl(context, taskAttemptListener);
+      UserPayload userPayload;
+      if (containerLauncherDescriptors.get(i).getEntityName()
+          .equals(TezConstants.getTezYarnServicePluginName()) ||
+          containerLauncherDescriptors.get(i).getEntityName()
+              .equals(TezConstants.getTezUberServicePluginName())) {
+        userPayload = defaultUserPayload;
+      } else {
+        userPayload = containerLauncherDescriptors.get(i).getUserPayload();
+      }
+      ContainerLauncherContext containerLauncherContext =
+          new ContainerLauncherContextImpl(context, taskAttemptListener, userPayload);
       containerLauncherContexts[i] = containerLauncherContext;
       containerLaunchers[i] = createContainerLauncher(containerLauncherDescriptors.get(i), context,
-          containerLauncherContext, taskAttemptListener, workingDirectory, isPureLocalMode, conf);
+          containerLauncherContext, taskAttemptListener, workingDirectory, isPureLocalMode);
       containerLauncherServiceWrappers[i] = new ServicePluginLifecycleAbstractService(containerLaunchers[i]);
     }
   }
@@ -99,8 +110,7 @@ public class ContainerLauncherRouter extends AbstractService
                                                     ContainerLauncherContext containerLauncherContext,
                                                     TaskAttemptListener taskAttemptListener,
                                                     String workingDirectory,
-                                                    boolean isPureLocalMode,
-                                                    Configuration conf) throws
+                                                    boolean isPureLocalMode) throws
       UnknownHostException {
     if (containerLauncherDescriptor.getEntityName().equals(
         TezConstants.getTezYarnServicePluginName())) {

http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
index 3975111..1d3e6df 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
@@ -44,6 +44,7 @@ import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
 import org.apache.tez.serviceplugins.api.ContainerLauncher;
 import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
@@ -129,7 +130,14 @@ public class LocalContainerLauncher extends ContainerLauncher {
         System.getenv(Environment.NM_HOST.name());
     executionContext = new ExecutionContextImpl(host);
 
-    numExecutors = getContext().getInitialConfiguration().getInt(TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS,
+    Configuration conf;
+    try {
+      conf = TezUtils.createConfFromUserPayload(getContext().getInitialUserPayload());
+    } catch (IOException e) {
+      throw new TezUncheckedException(
+          "Failed to parse user payload for " + LocalContainerLauncher.class.getSimpleName(), e);
+    }
+    numExecutors = conf.getInt(TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS,
         TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS_DEFAULT);
     Preconditions.checkState(numExecutors >=1, "Must have at least 1 executor");
     ExecutorService rawExecutor = Executors.newFixedThreadPool(numExecutors,

http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
index 1d889ae..395589c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.dag.app.rm;
 
+import java.io.IOException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.PriorityBlockingQueue;
@@ -27,6 +28,7 @@ import java.util.LinkedHashMap;
 
 import com.google.common.primitives.Ints;
 
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.serviceplugins.api.TaskScheduler;
 import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
 import org.slf4j.Logger;
@@ -65,7 +67,13 @@ public class LocalTaskSchedulerService extends TaskScheduler {
     this.appTrackingUrl = taskSchedulerContext.getAppTrackingUrl();
     this.containerSignatureMatcher = taskSchedulerContext.getContainerSignatureMatcher();
     this.customContainerAppId = taskSchedulerContext.getCustomClusterIdentifier();
-    this.conf = taskSchedulerContext.getInitialConfiguration();
+    try {
+      this.conf = TezUtils.createConfFromUserPayload(taskSchedulerContext.getInitialUserPayload());
+    } catch (IOException e) {
+      throw new TezUncheckedException(
+          "Failed to deserialize payload for " + LocalTaskSchedulerService.class.getSimpleName(),
+          e);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java
index 890870e..7f1d5a3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java
@@ -18,7 +18,6 @@ import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -28,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.tez.common.ContainerSignatureMatcher;
 import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
 
@@ -40,12 +40,12 @@ public class TaskSchedulerContextImpl implements TaskSchedulerContext {
   private final long customClusterIdentifier;
   private final String appHostName;
   private final int clientPort;
-  private final Configuration conf;
+  private final UserPayload initialUserPayload;
 
   public TaskSchedulerContextImpl(TaskSchedulerEventHandler tseh, AppContext appContext,
                                   int schedulerId, String trackingUrl, long customClusterIdentifier,
                                   String appHostname, int clientPort,
-                                  Configuration conf) {
+                                  UserPayload initialUserPayload) {
     this.tseh = tseh;
     this.appContext = appContext;
     this.schedulerId = schedulerId;
@@ -53,7 +53,7 @@ public class TaskSchedulerContextImpl implements TaskSchedulerContext {
     this.customClusterIdentifier = customClusterIdentifier;
     this.appHostName = appHostname;
     this.clientPort = clientPort;
-    this.conf = conf;
+    this.initialUserPayload = initialUserPayload;
 
   }
 
@@ -110,8 +110,8 @@ public class TaskSchedulerContextImpl implements TaskSchedulerContext {
   }
 
   @Override
-  public Configuration getInitialConfiguration() {
-    return conf;
+  public UserPayload getInitialUserPayload() {
+    return initialUserPayload;
   }
 
 

http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java
index e64ef43..9e4c8e0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java
@@ -27,7 +27,6 @@ import java.util.concurrent.Future;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -37,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.tez.common.ContainerSignatureMatcher;
 import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
 
 /**
@@ -132,8 +132,8 @@ class TaskSchedulerContextImplWrapper implements TaskSchedulerContext {
   // does not use locks.
 
   @Override
-  public Configuration getInitialConfiguration() {
-    return real.getInitialConfiguration();
+  public UserPayload getInitialUserPayload() {
+    return real.getInitialUserPayload();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 8038e2c..4899f82 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -37,6 +37,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.tez.dag.api.NamedEntityDescriptor;
 import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
 import org.apache.tez.serviceplugins.api.TaskScheduler;
 import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
@@ -114,7 +115,7 @@ public class TaskSchedulerEventHandler extends AbstractService implements
   private AtomicBoolean shouldUnregisterFlag =
       new AtomicBoolean(false);
   private final WebUIService webUI;
-  private final String[] taskSchedulerClasses;
+  private final NamedEntityDescriptor[] taskSchedulerDescriptors;
   protected final TaskScheduler[]taskSchedulers;
   protected final ServicePluginLifecycleAbstractService []taskSchedulerServiceWrappers;
 
@@ -152,7 +153,8 @@ public class TaskSchedulerEventHandler extends AbstractService implements
   public TaskSchedulerEventHandler(AppContext appContext,
       DAGClientServer clientService, EventHandler eventHandler, 
       ContainerSignatureMatcher containerSignatureMatcher, WebUIService webUI,
-      List<NamedEntityDescriptor> schedulerDescriptors, boolean isPureLocalMode) {
+      List<NamedEntityDescriptor> schedulerDescriptors, UserPayload defaultPayload,
+                                   boolean isPureLocalMode) {
     super(TaskSchedulerEventHandler.class.getName());
     this.appContext = appContext;
     this.eventHandler = eventHandler;
@@ -168,39 +170,50 @@ public class TaskSchedulerEventHandler extends AbstractService implements
 
     // Override everything for pure local mode
     if (isPureLocalMode) {
-      this.taskSchedulerClasses = new String[] {TezConstants.getTezUberServicePluginName()};
+      this.taskSchedulerDescriptors = new NamedEntityDescriptor[]{
+          new NamedEntityDescriptor(TezConstants.getTezUberServicePluginName(), null)
+              .setUserPayload(defaultPayload)};
       this.yarnTaskSchedulerIndex = -1;
     } else {
       if (schedulerDescriptors == null || schedulerDescriptors.isEmpty()) {
-        this.taskSchedulerClasses = new String[] {TezConstants.getTezYarnServicePluginName()};
+        this.taskSchedulerDescriptors = new NamedEntityDescriptor[]{
+            new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null)
+                .setUserPayload(defaultPayload)};
         this.yarnTaskSchedulerIndex = 0;
       } else {
         // Ensure the YarnScheduler will be setup and note it's index. This will be responsible for heartbeats and YARN registration.
         int foundYarnTaskSchedulerIndex = -1;
 
-        List<String> taskSchedulerClassList = new LinkedList<>();
+        List<NamedEntityDescriptor> schedulerDescriptorList = new LinkedList<>();
         for (int i = 0 ; i < schedulerDescriptors.size() ; i++) {
           if (schedulerDescriptors.get(i).getEntityName().equals(
               TezConstants.getTezYarnServicePluginName())) {
-            taskSchedulerClassList.add(schedulerDescriptors.get(i).getEntityName());
+            schedulerDescriptorList.add(
+                new NamedEntityDescriptor(schedulerDescriptors.get(i).getEntityName(), null)
+                    .setUserPayload(
+                        defaultPayload));
             foundYarnTaskSchedulerIndex = i;
           } else if (schedulerDescriptors.get(i).getEntityName().equals(
               TezConstants.getTezUberServicePluginName())) {
-            taskSchedulerClassList.add(schedulerDescriptors.get(i).getEntityName());
+            schedulerDescriptorList.add(
+                new NamedEntityDescriptor(schedulerDescriptors.get(i).getEntityName(), null)
+                    .setUserPayload(
+                        defaultPayload));
           } else {
-            taskSchedulerClassList.add(schedulerDescriptors.get(i).getClassName());
+            schedulerDescriptorList.add(schedulerDescriptors.get(i));
           }
         }
         if (foundYarnTaskSchedulerIndex == -1) {
-          taskSchedulerClassList.add(YarnTaskSchedulerService.class.getName());
-          foundYarnTaskSchedulerIndex = taskSchedulerClassList.size() -1;
+          schedulerDescriptorList.add(new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null).setUserPayload(
+              defaultPayload));
+          foundYarnTaskSchedulerIndex = schedulerDescriptorList.size() -1;
         }
-        this.taskSchedulerClasses = taskSchedulerClassList.toArray(new String[taskSchedulerClassList.size()]);
+        this.taskSchedulerDescriptors = schedulerDescriptorList.toArray(new NamedEntityDescriptor[schedulerDescriptorList.size()]);
         this.yarnTaskSchedulerIndex = foundYarnTaskSchedulerIndex;
       }
     }
-    taskSchedulers = new TaskScheduler[this.taskSchedulerClasses.length];
-    taskSchedulerServiceWrappers = new ServicePluginLifecycleAbstractService[this.taskSchedulerClasses.length];
+    taskSchedulers = new TaskScheduler[this.taskSchedulerDescriptors.length];
+    taskSchedulerServiceWrappers = new ServicePluginLifecycleAbstractService[this.taskSchedulerDescriptors.length];
   }
 
   public Map<ApplicationAccessType, String> getApplicationAcls() {
@@ -417,23 +430,24 @@ public class TaskSchedulerEventHandler extends AbstractService implements
 
   private TaskScheduler createTaskScheduler(String host, int port, String trackingUrl,
                                                    AppContext appContext,
-                                                   String schedulerClassName,
+                                                   NamedEntityDescriptor taskSchedulerDescriptor,
                                                    long customAppIdIdentifier,
                                                    int schedulerId) {
     TaskSchedulerContext rawContext =
         new TaskSchedulerContextImpl(this, appContext, schedulerId, trackingUrl,
-            customAppIdIdentifier, host, port, getConfig());
+            customAppIdIdentifier, host, port, taskSchedulerDescriptor.getUserPayload());
     TaskSchedulerContext wrappedContext = new TaskSchedulerContextImplWrapper(rawContext, appCallbackExecutor);
-    if (schedulerClassName.equals(TezConstants.getTezYarnServicePluginName())) {
+    String schedulerName = taskSchedulerDescriptor.getEntityName();
+    if (schedulerName.equals(TezConstants.getTezYarnServicePluginName())) {
       LOG.info("Creating TaskScheduler: YarnTaskSchedulerService");
       return new YarnTaskSchedulerService(wrappedContext);
-    } else if (schedulerClassName.equals(TezConstants.getTezUberServicePluginName())) {
+    } else if (schedulerName.equals(TezConstants.getTezUberServicePluginName())) {
       LOG.info("Creating TaskScheduler: Local TaskScheduler");
       return new LocalTaskSchedulerService(wrappedContext);
     } else {
-      LOG.info("Creating custom TaskScheduler: " + schedulerClassName);
+      LOG.info("Creating custom TaskScheduler {}:{}", taskSchedulerDescriptor.getEntityName(), taskSchedulerDescriptor.getClassName());
       Class<? extends TaskScheduler> taskSchedulerClazz =
-          (Class<? extends TaskScheduler>) ReflectionUtils.getClazz(schedulerClassName);
+          (Class<? extends TaskScheduler>) ReflectionUtils.getClazz(taskSchedulerDescriptor.getClassName());
       try {
         Constructor<? extends TaskScheduler> ctor = taskSchedulerClazz
             .getConstructor(TaskSchedulerContext.class);
@@ -453,21 +467,20 @@ public class TaskSchedulerEventHandler extends AbstractService implements
 
   @VisibleForTesting
   protected void instantiateScheduelrs(String host, int port, String trackingUrl, AppContext appContext) {
-    // TODO Add error checking for components being used in the Vertex when running in pure local mode.
     // Iterate over the list and create all the taskSchedulers
     int j = 0;
-    for (int i = 0; i < taskSchedulerClasses.length; i++) {
+    for (int i = 0; i < taskSchedulerDescriptors.length; i++) {
       long customAppIdIdentifier;
-      if (isPureLocalMode || taskSchedulerClasses[i].equals(
+      if (isPureLocalMode || taskSchedulerDescriptors[i].equals(
           TezConstants.getTezYarnServicePluginName())) { // Use the app identifier from the appId.
         customAppIdIdentifier = appContext.getApplicationID().getClusterTimestamp();
       } else {
         customAppIdIdentifier = SCHEDULER_APP_ID_BASE + (j++ * SCHEDULER_APP_ID_INCREMENT);
       }
-      LOG.info("ClusterIdentifier for TaskScheduler [" + i + ":" + taskSchedulerClasses[i] + "]=" +
+      LOG.info("ClusterIdentifier for TaskScheduler [" + i + ":" + taskSchedulerDescriptors[i].getEntityName() + "]=" +
           customAppIdIdentifier);
       taskSchedulers[i] = createTaskScheduler(host, port,
-          trackingUrl, appContext, taskSchedulerClasses[i], customAppIdIdentifier, i);
+          trackingUrl, appContext, taskSchedulerDescriptors[i], customAppIdIdentifier, i);
       taskSchedulerServiceWrappers[i] = new ServicePluginLifecycleAbstractService(taskSchedulers[i]);
     }
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
index 08821b0..b4d1f26 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -31,13 +31,11 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.serviceplugins.api.TaskScheduler;
 import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
 import org.apache.tez.serviceplugins.api.TaskSchedulerContext.AMState;
@@ -72,7 +70,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /* TODO not yet updating cluster nodes on every allocate response
  * from RMContainerRequestor
@@ -220,7 +217,13 @@ public class YarnTaskSchedulerService extends TaskScheduler
     this.appHostName = taskSchedulerContext.getAppHostName();
     this.appHostPort = taskSchedulerContext.getAppClientPort();
     this.appTrackingUrl = taskSchedulerContext.getAppTrackingUrl();
-    this.conf = taskSchedulerContext.getInitialConfiguration();
+    try {
+      this.conf = TezUtils.createConfFromUserPayload(taskSchedulerContext.getInitialUserPayload());
+    } catch (IOException e) {
+      throw new TezUncheckedException(
+          "Failed to deserialize payload for " + YarnTaskSchedulerService.class.getSimpleName(),
+          e);
+    }
   }
 
   @Private
@@ -233,7 +236,13 @@ public class YarnTaskSchedulerService extends TaskScheduler
     this.appHostName = taskSchedulerContext.getAppHostName();
     this.appHostPort = taskSchedulerContext.getAppClientPort();
     this.appTrackingUrl = taskSchedulerContext.getAppTrackingUrl();
-    this.conf = taskSchedulerContext.getInitialConfiguration();
+    try {
+      this.conf = TezUtils.createConfFromUserPayload(taskSchedulerContext.getInitialUserPayload());
+    } catch (IOException e) {
+      throw new TezUncheckedException(
+          "Failed to deserialize payload for " + YarnTaskSchedulerService.class.getSimpleName(),
+          e);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index 17feeaa..0723dbc 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.dag.app;
 
+import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.lang.management.ThreadMXBean;
 import java.net.UnknownHostException;
@@ -34,7 +35,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.NamedEntityDescriptor;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
 import org.apache.tez.serviceplugins.api.ContainerLauncher;
 import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
@@ -488,7 +491,15 @@ public class MockDAGAppMaster extends DAGAppMaster {
     super(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort, clock, appSubmitTime,
         isSession, workingDirectory, localDirs, logDirs,  new TezApiVersionInfo().getVersion(), 1,
         credentials, jobUserName, null);
-    containerLauncherContext = new ContainerLauncherContextImpl(getContext(), getTaskAttemptListener());
+    Configuration conf = new Configuration(false);
+    UserPayload userPayload;
+    try {
+      userPayload = TezUtils.createUserPayloadFromConf(conf);
+    } catch (IOException e) {
+      throw new TezUncheckedException(e);
+    }
+    containerLauncherContext =
+        new ContainerLauncherContextImpl(getContext(), getTaskAttemptListener(), userPayload);
     containerLauncher = new MockContainerLauncher(launcherGoFlag, containerLauncherContext);
     shutdownHandler = new MockDAGAppMasterShutdownHandler();
     this.initFailFlag = initFailFlag;
@@ -500,7 +511,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
 
   // use mock container launcher for tests
   @Override
-  protected ContainerLauncherRouter createContainerLauncherRouter(final Configuration conf,
+  protected ContainerLauncherRouter createContainerLauncherRouter(final UserPayload defaultUserPayload,
                                                                   List<NamedEntityDescriptor> containerLauncherDescirptors,
                                                                   boolean isLocal)
       throws UnknownHostException {

http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
index e45b0a2..1cb69a8 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
@@ -47,11 +47,14 @@ import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.common.ContainerContext;
 import org.apache.tez.common.ContainerTask;
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.common.security.TokenCache;
 import org.apache.tez.dag.api.NamedEntityDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.serviceplugins.api.ContainerEndReason;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.apache.tez.dag.api.TaskHeartbeatRequest;
@@ -135,8 +138,15 @@ public class TestTaskAttemptListenerImplTezDag {
     doReturn(amContainer).when(amContainerMap).get(any(ContainerId.class));
     doReturn(container).when(amContainer).getContainer();
 
+    Configuration conf = new TezConfiguration();
+    UserPayload defaultPayload;
+    try {
+      defaultPayload = TezUtils.createUserPayloadFromConf(conf);
+    } catch (IOException e) {
+      throw new TezUncheckedException(e);
+    }
     taskAttemptListener = new TaskAttemptListenerImplForTest(appContext,
-        mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null, null, false);
+        mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null, defaultPayload, false);
     TezTaskCommunicatorImpl taskCommunicator = (TezTaskCommunicatorImpl)taskAttemptListener.getTaskCommunicator();
     TezTaskUmbilicalProtocol tezUmbilical = taskCommunicator.getUmbilical();
 
@@ -299,8 +309,14 @@ public class TestTaskAttemptListenerImplTezDag {
         new JobTokenSecretManager());
     sessionToken.setService(identifier.getJobId());
     TokenCache.setSessionToken(sessionToken, credentials);
+    UserPayload userPayload = null;
+    try {
+      userPayload = TezUtils.createUserPayloadFromConf(conf);
+    } catch (IOException e) {
+      throw new TezUncheckedException(e);
+    }
     taskAttemptListener = new TaskAttemptListenerImpTezDag(appContext,
-        mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null, conf, false);
+        mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null, userPayload, false);
     // no exception happen, should started properly
     taskAttemptListener.init(conf);
     taskAttemptListener.start();
@@ -319,8 +335,14 @@ public class TestTaskAttemptListenerImplTezDag {
       TokenCache.setSessionToken(sessionToken, credentials);
 
       conf.set(TezConfiguration.TEZ_AM_TASK_AM_PORT_RANGE, port + "-" + port);
+      UserPayload userPayload = null;
+      try {
+        userPayload = TezUtils.createUserPayloadFromConf(conf);
+      } catch (IOException e) {
+        throw new TezUncheckedException(e);
+      }
       taskAttemptListener = new TaskAttemptListenerImpTezDag(appContext,
-          mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null, conf, false);
+          mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null, userPayload, false);
       taskAttemptListener.init(conf);
       taskAttemptListener.start();
       int resultedPort = taskAttemptListener.getTaskCommunicator(0).getAddress().getPort();
@@ -377,9 +399,9 @@ public class TestTaskAttemptListenerImplTezDag {
                                           TaskHeartbeatHandler thh,
                                           ContainerHeartbeatHandler chh,
                                           List<NamedEntityDescriptor> taskCommDescriptors,
-                                          Configuration conf,
+                                          UserPayload userPayload,
                                           boolean isPureLocalMode) {
-      super(context, thh, chh, taskCommDescriptors, conf,
+      super(context, thh, chh, taskCommDescriptors, userPayload,
           isPureLocalMode);
     }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java
index 8d776fb..1c82bd8 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java
@@ -22,9 +22,11 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -34,6 +36,10 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
@@ -51,7 +57,7 @@ import org.mockito.ArgumentCaptor;
 public class TestTaskAttemptListenerImplTezDag2 {
 
   @Test(timeout = 5000)
-  public void testTaskAttemptFailedKilled() {
+  public void testTaskAttemptFailedKilled() throws IOException {
     ApplicationId appId = ApplicationId.newInstance(1000, 1);
     ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
     Credentials credentials = new Credentials();
@@ -73,9 +79,11 @@ public class TestTaskAttemptListenerImplTezDag2 {
     doReturn(amContainer).when(amContainerMap).get(any(ContainerId.class));
     doReturn(container).when(amContainer).getContainer();
 
+    Configuration conf = new TezConfiguration();
+    UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf);
     TaskAttemptListenerImpTezDag taskAttemptListener =
         new TaskAttemptListenerImpTezDag(appContext, mock(TaskHeartbeatHandler.class),
-            mock(ContainerHeartbeatHandler.class), null, null, false);
+            mock(ContainerHeartbeatHandler.class), null, userPayload, false);
 
     TaskSpec taskSpec1 = mock(TaskSpec.class);
     TezTaskAttemptID taskAttemptId1 = mock(TezTaskAttemptID.class);

http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index 88f6066..8e8224a 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -37,6 +37,7 @@ import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.tez.common.TezUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -144,7 +145,7 @@ public class TestContainerReuse {
     TaskSchedulerEventHandler taskSchedulerEventHandlerReal =
         new TaskSchedulerEventHandlerForTest(
           appContext, eventHandler, rmClient,
-          new AlwaysMatchesContainerMatcher());
+          new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(conf));
     TaskSchedulerEventHandler taskSchedulerEventHandler =
         spy(taskSchedulerEventHandlerReal);
     taskSchedulerEventHandler.init(conf);
@@ -279,7 +280,7 @@ public class TestContainerReuse {
 
     TaskSchedulerEventHandler taskSchedulerEventHandlerReal =
       new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient,
-        new AlwaysMatchesContainerMatcher());
+        new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(conf));
     TaskSchedulerEventHandler taskSchedulerEventHandler =
       spy(taskSchedulerEventHandlerReal);
     taskSchedulerEventHandler.init(conf);
@@ -378,7 +379,7 @@ public class TestContainerReuse {
     doReturn(dagID).when(appContext).getCurrentDAGID();
     doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo();
 
-    TaskSchedulerEventHandler taskSchedulerEventHandlerReal = new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient, new AlwaysMatchesContainerMatcher());
+    TaskSchedulerEventHandler taskSchedulerEventHandlerReal = new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient, new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(tezConf));
     TaskSchedulerEventHandler taskSchedulerEventHandler = spy(taskSchedulerEventHandlerReal);
     taskSchedulerEventHandler.init(tezConf);
     taskSchedulerEventHandler.start();
@@ -514,7 +515,7 @@ public class TestContainerReuse {
 
     //Use ContainerContextMatcher here.  Otherwise it would not match the JVM options
     TaskSchedulerEventHandler taskSchedulerEventHandlerReal =
-        new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient, new ContainerContextMatcher());
+        new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient, new ContainerContextMatcher(), TezUtils.createUserPayloadFromConf(tezConf));
     TaskSchedulerEventHandler taskSchedulerEventHandler = spy(taskSchedulerEventHandlerReal);
     taskSchedulerEventHandler.init(tezConf);
     taskSchedulerEventHandler.start();
@@ -709,7 +710,7 @@ public class TestContainerReuse {
     TaskSchedulerEventHandler taskSchedulerEventHandlerReal =
         new TaskSchedulerEventHandlerForTest(
           appContext, eventHandler, rmClient,
-          new AlwaysMatchesContainerMatcher());
+          new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(tezConf));
     TaskSchedulerEventHandler taskSchedulerEventHandler =
         spy(taskSchedulerEventHandlerReal);
     taskSchedulerEventHandler.init(tezConf);
@@ -833,7 +834,7 @@ public class TestContainerReuse {
 
     TaskSchedulerEventHandler taskSchedulerEventHandlerReal =
       new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient,
-        new AlwaysMatchesContainerMatcher());
+        new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(tezConf));
     TaskSchedulerEventHandler taskSchedulerEventHandler =
       spy(taskSchedulerEventHandlerReal);
     taskSchedulerEventHandler.init(tezConf);
@@ -947,7 +948,7 @@ public class TestContainerReuse {
     doAnswer(dagIDAnswer).when(appContext).getCurrentDAGID();
     doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo();
     
-    TaskSchedulerEventHandler taskSchedulerEventHandlerReal = new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient, new AlwaysMatchesContainerMatcher());
+    TaskSchedulerEventHandler taskSchedulerEventHandlerReal = new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient, new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(tezConf));
     TaskSchedulerEventHandler taskSchedulerEventHandler = spy(taskSchedulerEventHandlerReal);
     taskSchedulerEventHandler.init(tezConf);
     taskSchedulerEventHandler.start();
@@ -1105,7 +1106,7 @@ public class TestContainerReuse {
 
     TaskSchedulerEventHandler taskSchedulerEventHandlerReal =
         new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient,
-            new ContainerContextMatcher());
+            new ContainerContextMatcher(), TezUtils.createUserPayloadFromConf(tezConf));
     TaskSchedulerEventHandler taskSchedulerEventHandler = spy(taskSchedulerEventHandlerReal);
     taskSchedulerEventHandler.init(tezConf);
     taskSchedulerEventHandler.start();
@@ -1259,7 +1260,7 @@ public class TestContainerReuse {
 
     TaskSchedulerEventHandler taskSchedulerEventHandlerReal =
         new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient,
-            new AlwaysMatchesContainerMatcher());
+            new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(tezConf));
     TaskSchedulerEventHandler taskSchedulerEventHandler = spy(taskSchedulerEventHandlerReal);
     taskSchedulerEventHandler.init(tezConf);
     taskSchedulerEventHandler.start();

http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index 7b9ac4f..cf21a1d 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -145,7 +145,8 @@ public class TestTaskScheduler {
 
     scheduler.initialize();
     drainableAppCallback.drain();
-    verify(mockRMClient).init(conf);
+    // Verifying the validity of the configuration via the interval only instead of making sure
+    // it's the same instance.
     verify(mockRMClient).setHeartbeatInterval(interval);
 
     RegisterApplicationMasterResponse mockRegResponse =

http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
index f191175..f8aa1e2 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
@@ -49,9 +49,12 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.common.ContainerSignatureMatcher;
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.NamedEntityDescriptor;
 import org.apache.tez.dag.api.TaskLocationHint;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.client.DAGClientServer;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerContext;
@@ -92,8 +95,10 @@ public class TestTaskSchedulerEventHandler {
     
     public MockTaskSchedulerEventHandler(AppContext appContext,
         DAGClientServer clientService, EventHandler eventHandler,
-        ContainerSignatureMatcher containerSignatureMatcher, WebUIService webUI) {
-      super(appContext, clientService, eventHandler, containerSignatureMatcher, webUI, new LinkedList<NamedEntityDescriptor>(), false);
+        ContainerSignatureMatcher containerSignatureMatcher, WebUIService webUI,
+        UserPayload defaultPayload) {
+      super(appContext, clientService, eventHandler, containerSignatureMatcher, webUI,
+          new LinkedList<NamedEntityDescriptor>(), defaultPayload, false);
     }
 
     @Override
@@ -134,8 +139,15 @@ public class TestTaskSchedulerEventHandler {
     mockWebUIService = mock(WebUIService.class);
     when(mockAppContext.getAllContainers()).thenReturn(mockAMContainerMap);
     when(mockClientService.getBindAddress()).thenReturn(new InetSocketAddress(10000));
+    Configuration conf = new Configuration(false);
+    UserPayload userPayload;
+    try {
+      userPayload = TezUtils.createUserPayloadFromConf(conf);
+    } catch (IOException e) {
+      throw new TezUncheckedException(e);
+    }
     schedulerHandler = new MockTaskSchedulerEventHandler(
-        mockAppContext, mockClientService, mockEventHandler, mockSigMatcher, mockWebUIService);
+        mockAppContext, mockClientService, mockEventHandler, mockSigMatcher, mockWebUIService, userPayload);
   }
 
   @Test(timeout = 5000)

http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
index 60d37e9..59ab00a 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
@@ -24,6 +24,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.LinkedList;
@@ -59,7 +60,10 @@ import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.common.ContainerSignatureMatcher;
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.NamedEntityDescriptor;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
 import org.apache.tez.dag.app.rm.YarnTaskSchedulerService.CookieContainerRequest;
@@ -125,22 +129,26 @@ class TestTaskSchedulerHelpers {
 
     private TezAMRMClientAsync<CookieContainerRequest> amrmClientAsync;
     private ContainerSignatureMatcher containerSignatureMatcher;
+    private UserPayload defaultPayload;
 
     @SuppressWarnings("rawtypes")
     public TaskSchedulerEventHandlerForTest(AppContext appContext,
         EventHandler eventHandler,
         TezAMRMClientAsync<CookieContainerRequest> amrmClientAsync,
-        ContainerSignatureMatcher containerSignatureMatcher) {
-      super(appContext, null, eventHandler, containerSignatureMatcher, null, new LinkedList<NamedEntityDescriptor>(), false);
+        ContainerSignatureMatcher containerSignatureMatcher,
+        UserPayload defaultPayload) {
+      super(appContext, null, eventHandler, containerSignatureMatcher, null,
+          new LinkedList<NamedEntityDescriptor>(), defaultPayload, false);
       this.amrmClientAsync = amrmClientAsync;
       this.containerSignatureMatcher = containerSignatureMatcher;
+      this.defaultPayload = defaultPayload;
     }
 
     @Override
     public void instantiateScheduelrs(String host, int port, String trackingUrl, AppContext appContext) {
       TaskSchedulerContext taskSchedulerContext =
           new TaskSchedulerContextImpl(this, appContext, 0, trackingUrl, 1000, host, port,
-              getConfig());
+              defaultPayload);
       TaskSchedulerContextImplWrapper wrapper =
           new TaskSchedulerContextImplWrapper(taskSchedulerContext,
               new CountingExecutorService(appCallbackExecutor));
@@ -287,8 +295,8 @@ class TestTaskSchedulerHelpers {
     // Not incrementing invocations for methods which to not obtain locks,
     // and do not go via the executor service.
     @Override
-    public Configuration getInitialConfiguration() {
-      return real.getInitialConfiguration();
+    public UserPayload getInitialUserPayload() {
+      return real.getInitialUserPayload();
     }
 
     @Override
@@ -523,7 +531,13 @@ class TestTaskSchedulerHelpers {
     when(mockContext.getAppTrackingUrl()).thenReturn(appUrl);
 
     when(mockContext.getAMState()).thenReturn(TaskSchedulerContext.AMState.RUNNING_APP);
-    when(mockContext.getInitialConfiguration()).thenReturn(conf);
+    UserPayload userPayload;
+    try {
+      userPayload = TezUtils.createUserPayloadFromConf(conf);
+    } catch (IOException e) {
+      throw new TezUncheckedException(e);
+    }
+    when(mockContext.getInitialUserPayload()).thenReturn(userPayload);
     when(mockContext.isSession()).thenReturn(isSession);
     if (containerSignatureMatcher != null) {
       when(mockContext.getContainerSignatureMatcher())

http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
index 0002b42..f31a07b 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
@@ -19,10 +19,12 @@ import java.net.InetSocketAddress;
 
 import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
 import org.apache.tez.serviceplugins.api.ContainerLauncher;
 import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
@@ -49,17 +51,22 @@ public class TezTestServiceContainerLauncher extends ContainerLauncher {
   private final int servicePort;
   private final TezTestServiceCommunicator communicator;
   private final ApplicationAttemptId appAttemptId;
-  //  private final TaskAttemptListener tal;
+  private final Configuration conf;
 
 
   // Configuration passed in here to set up final parameters
   public TezTestServiceContainerLauncher(ContainerLauncherContext containerLauncherContext) {
     super(containerLauncherContext);
-    int numThreads = getContext().getInitialConfiguration().getInt(
+    try {
+      conf = TezUtils.createConfFromUserPayload(getContext().getInitialUserPayload());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    int numThreads = conf.getInt(
         TezTestServiceConfConstants.TEZ_TEST_SERVICE_AM_COMMUNICATOR_NUM_THREADS,
         TezTestServiceConfConstants.TEZ_TEST_SERVICE_AM_COMMUNICATOR_NUM_THREADS_DEFAULT);
 
-    this.servicePort = getContext().getInitialConfiguration().getInt(
+    this.servicePort = conf.getInt(
         TezTestServiceConfConstants.TEZ_TEST_SERVICE_RPC_PORT, -1);
     Preconditions.checkArgument(servicePort > 0,
         TezTestServiceConfConstants.TEZ_TEST_SERVICE_RPC_PORT + " must be set");
@@ -70,7 +77,7 @@ public class TezTestServiceContainerLauncher extends ContainerLauncher {
 
   @Override
   public void start() {
-    communicator.init(getContext().getInitialConfiguration());
+    communicator.init(conf);
     communicator.start();
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
index 7d209bc..0d87995 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
@@ -14,6 +14,7 @@
 
 package org.apache.tez.dag.app.rm;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
@@ -32,6 +33,8 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.apache.tez.service.TezTestServiceConfConstants;
 import org.apache.tez.serviceplugins.api.TaskScheduler;
@@ -74,7 +77,12 @@ public class TezTestServiceTaskSchedulerService extends TaskScheduler {
     this.containerFactory = new ContainerFactory(taskSchedulerContext.getApplicationAttemptId(),
         taskSchedulerContext.getCustomClusterIdentifier());
 
-    Configuration conf = taskSchedulerContext.getInitialConfiguration();
+    Configuration conf = null;
+    try {
+      conf = TezUtils.createConfFromUserPayload(taskSchedulerContext.getInitialUserPayload());
+    } catch (IOException e) {
+      throw new TezUncheckedException(e);
+    }
     this.memoryPerInstance = conf
         .getInt(TezTestServiceConfConstants.TEZ_TEST_SERVICE_MEMORY_PER_INSTANCE_MB, -1);
     Preconditions.checkArgument(memoryPerInstance > 0,

http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
index 078ea79..ef8f9e4 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
@@ -76,7 +76,7 @@ public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl
   @Override
   public void initialize() throws Exception {
     super.initialize();
-    this.communicator.init(getContext().getInitialConfiguration());
+    this.communicator.init(getConf());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
index 07dd363..2c52ae3 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
@@ -24,10 +24,12 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.tez.client.TezClient;
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.Vertex.VertexExecutionContext;
 import org.apache.tez.dag.api.client.DAGClient;
@@ -136,17 +138,22 @@ public class TestExternalTezServices {
     confForJobs.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString());
     confForJobs.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, false);
 
+    UserPayload userPayload = TezUtils.createUserPayloadFromConf(confForJobs);
+
     TaskSchedulerDescriptor[] taskSchedulerDescriptors = new TaskSchedulerDescriptor[]{
         TaskSchedulerDescriptor
-            .create(EXT_PUSH_ENTITY_NAME, TezTestServiceTaskSchedulerService.class.getName())};
+            .create(EXT_PUSH_ENTITY_NAME, TezTestServiceTaskSchedulerService.class.getName())
+            .setUserPayload(userPayload)};
 
     ContainerLauncherDescriptor[] containerLauncherDescriptors = new ContainerLauncherDescriptor[]{
         ContainerLauncherDescriptor
-            .create(EXT_PUSH_ENTITY_NAME, TezTestServiceNoOpContainerLauncher.class.getName())};
+            .create(EXT_PUSH_ENTITY_NAME, TezTestServiceNoOpContainerLauncher.class.getName())
+            .setUserPayload(userPayload)};
 
     TaskCommunicatorDescriptor[] taskCommunicatorDescriptors = new TaskCommunicatorDescriptor[]{
         TaskCommunicatorDescriptor
-            .create(EXT_PUSH_ENTITY_NAME, TezTestServiceTaskCommunicatorImpl.class.getName())};
+            .create(EXT_PUSH_ENTITY_NAME, TezTestServiceTaskCommunicatorImpl.class.getName())
+            .setUserPayload(userPayload)};
 
     ServicePluginsDescriptor servicePluginsDescriptor = ServicePluginsDescriptor.create(true, true,
         taskSchedulerDescriptors, containerLauncherDescriptors, taskCommunicatorDescriptors);