You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/21 03:36:43 UTC

[35/50] [abbrv] tez git commit: TEZ-2657. Add tests for client side changes - specifying plugins, etc. (sseth)

TEZ-2657. Add tests for client side changes - specifying plugins, etc. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/0245665f
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/0245665f
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/0245665f

Branch: refs/heads/TEZ-2003
Commit: 0245665f7d6f7c2869f029cc14eb63a197c99e3b
Parents: acac8dc
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Jul 29 18:26:01 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Aug 20 18:24:23 2015 -0700

----------------------------------------------------------------------
 TEZ-2003-CHANGES.txt                            |   1 +
 .../java/org/apache/tez/client/TezClient.java   |  15 +-
 .../org/apache/tez/client/TezClientUtils.java   |  38 +---
 .../apache/tez/dag/api/DagTypeConverters.java   |  67 +++++--
 .../java/org/apache/tez/dag/api/Vertex.java     |  41 ++++
 .../api/ServicePluginsDescriptor.java           |  36 ++++
 tez-api/src/main/proto/DAGApiRecords.proto      |   2 +-
 .../org/apache/tez/client/TestTezClient.java    | 111 +++++++++--
 .../apache/tez/client/TestTezClientUtils.java   |  16 +-
 .../org/apache/tez/dag/api/TestDAGPlan.java     |  63 +++++-
 .../tez/dag/api/TestDagTypeConverters.java      | 196 ++++++++++++++++++-
 .../org/apache/tez/dag/app/DAGAppMaster.java    |   4 +-
 12 files changed, 507 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/0245665f/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index 9d72d92..9b3967a 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -40,5 +40,6 @@ ALL CHANGES:
   TEZ-2652. Cleanup the way services are specified for an AM and vertices.
   TEZ-2653. Change service contexts to expose a user specified payload instead of the AM configuration.
   TEZ-2441. Add tests for TezTaskRunner2.
+  TEZ-2657. Add tests for client side changes - specifying plugins, etc.
 
 INCOMPATIBLE CHANGES:

http://git-wip-us.apache.org/repos/asf/tez/blob/0245665f/tez-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index ee6280a..27f0a81 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -96,13 +96,16 @@ public class TezClient {
   @VisibleForTesting
   static final String NO_CLUSTER_DIAGNOSTICS_MSG = "No cluster diagnostics found.";
 
-  private final String clientName;
+  @VisibleForTesting
+  final String clientName;
   private ApplicationId sessionAppId;
   private ApplicationId lastSubmittedAppId;
-  private AMConfiguration amConfig;
+  @VisibleForTesting
+  final AMConfiguration amConfig;
   private FrameworkClient frameworkClient;
   private String diagnostics;
-  private boolean isSession;
+  @VisibleForTesting
+  final boolean isSession;
   private boolean sessionStarted = false;
   private boolean sessionStopped = false;
   /** Tokens which will be required for all DAGs submitted to this session. */
@@ -114,8 +117,10 @@ public class TezClient {
   private JobTokenSecretManager jobTokenSecretManager =
       new JobTokenSecretManager();
   private final Map<String, LocalResource> additionalLocalResources = Maps.newHashMap();
-  private final TezApiVersionInfo apiVersionInfo;
-  private final ServicePluginsDescriptor servicePluginsDescriptor;
+  @VisibleForTesting
+  final TezApiVersionInfo apiVersionInfo;
+  @VisibleForTesting
+  final ServicePluginsDescriptor servicePluginsDescriptor;
   private HistoryACLPolicyManager historyACLPolicyManager;
 
   private int preWarmDAGCounter = 0;

http://git-wip-us.apache.org/repos/asf/tez/blob/0245665f/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index 9cf1f3f..6086fa1 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -39,9 +39,7 @@ import java.util.Map.Entry;
 
 import com.google.common.base.Strings;
 import org.apache.commons.lang.StringUtils;
-import org.apache.tez.dag.api.NamedEntityDescriptor;
 import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
-import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto;
 import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -778,47 +776,13 @@ public class TezClientUtils {
     }
 
     AMPluginDescriptorProto pluginDescriptorProto =
-        createAMServicePluginDescriptorProto(servicePluginsDescriptor);
+        DagTypeConverters.convertServicePluginDescriptoToProto(servicePluginsDescriptor);
     builder.setAmPluginDescriptor(pluginDescriptorProto);
 
     return builder.build();
   }
 
-  static AMPluginDescriptorProto createAMServicePluginDescriptorProto(
-      ServicePluginsDescriptor servicePluginsDescriptor) {
-    AMPluginDescriptorProto.Builder pluginDescriptorBuilder =
-        AMPluginDescriptorProto.newBuilder();
-    if (servicePluginsDescriptor != null) {
 
-      pluginDescriptorBuilder.setContainersEnabled(servicePluginsDescriptor.areContainersEnabled());
-      pluginDescriptorBuilder.setUberEnabled(servicePluginsDescriptor.isUberEnabled());
-
-      if (servicePluginsDescriptor.getTaskSchedulerDescriptors() != null &&
-          servicePluginsDescriptor.getTaskSchedulerDescriptors().length > 0) {
-        List<TezNamedEntityDescriptorProto> namedEntityProtos = DagTypeConverters.convertNamedEntityCollectionToProto(
-            servicePluginsDescriptor.getTaskSchedulerDescriptors());
-        pluginDescriptorBuilder.addAllTaskScedulers(namedEntityProtos);
-      }
-
-      if (servicePluginsDescriptor.getContainerLauncherDescriptors() != null &&
-          servicePluginsDescriptor.getContainerLauncherDescriptors().length > 0) {
-        List<TezNamedEntityDescriptorProto> namedEntityProtos = DagTypeConverters.convertNamedEntityCollectionToProto(
-            servicePluginsDescriptor.getContainerLauncherDescriptors());
-        pluginDescriptorBuilder.addAllContainerLaunchers(namedEntityProtos);
-      }
-
-      if (servicePluginsDescriptor.getTaskCommunicatorDescriptors() != null &&
-          servicePluginsDescriptor.getTaskCommunicatorDescriptors().length > 0) {
-        List<TezNamedEntityDescriptorProto> namedEntityProtos = DagTypeConverters.convertNamedEntityCollectionToProto(
-            servicePluginsDescriptor.getTaskCommunicatorDescriptors());
-        pluginDescriptorBuilder.addAllTaskCommunicators(namedEntityProtos);
-      }
-
-    } else {
-      pluginDescriptorBuilder.setContainersEnabled(true).setUberEnabled(false);
-    }
-    return pluginDescriptorBuilder.build();
-  }
 
   /**
    * Helper function to create a YARN LocalResource

http://git-wip-us.apache.org/repos/asf/tez/blob/0245665f/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
index 2e0d417..61e4d33 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
@@ -52,9 +52,11 @@ import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.Vertex.VertexExecutionContext;
 import org.apache.tez.dag.api.client.StatusGetOpts;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.TezAppMasterStatusProto;
 import org.apache.tez.dag.api.records.DAGProtos;
+import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
 import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
 import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
 import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeDataMovementType;
@@ -74,14 +76,13 @@ import org.apache.tez.dag.api.records.DAGProtos.TezCounterProto;
 import org.apache.tez.dag.api.records.DAGProtos.TezCountersProto;
 import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
 import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto;
+import org.apache.tez.dag.api.records.DAGProtos.VertexExecutionContextProto;
 import org.apache.tez.dag.api.records.DAGProtos.VertexLocationHintProto;
 
 import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.ByteString.Output;
-import org.apache.tez.serviceplugins.api.ContainerLauncherDescriptor;
-import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor;
-import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor;
+import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
 
 @Private
 public class DagTypeConverters {
@@ -732,13 +733,13 @@ public class DagTypeConverters {
     return payload.getPayload();
   }
 
-  public static DAGProtos.VertexExecutionContextProto convertToProto(
-      Vertex.VertexExecutionContext context) {
+  public static VertexExecutionContextProto convertToProto(
+      VertexExecutionContext context) {
     if (context == null) {
       return null;
     } else {
-      DAGProtos.VertexExecutionContextProto.Builder builder =
-          DAGProtos.VertexExecutionContextProto.newBuilder();
+      VertexExecutionContextProto.Builder builder =
+          VertexExecutionContextProto.newBuilder();
       builder.setExecuteInAm(context.shouldExecuteInAm());
       builder.setExecuteInContainers(context.shouldExecuteInContainers());
       if (context.getTaskSchedulerName() != null) {
@@ -754,26 +755,26 @@ public class DagTypeConverters {
     }
   }
 
-  public static Vertex.VertexExecutionContext convertFromProto(
-      DAGProtos.VertexExecutionContextProto proto) {
+  public static VertexExecutionContext convertFromProto(
+      VertexExecutionContextProto proto) {
     if (proto == null) {
       return null;
     } else {
       if (proto.getExecuteInAm()) {
-        Vertex.VertexExecutionContext context =
-            Vertex.VertexExecutionContext.createExecuteInAm(proto.getExecuteInAm());
+        VertexExecutionContext context =
+            VertexExecutionContext.createExecuteInAm(proto.getExecuteInAm());
         return context;
       } else if (proto.getExecuteInContainers()) {
-        Vertex.VertexExecutionContext context =
-            Vertex.VertexExecutionContext.createExecuteInContainers(proto.getExecuteInContainers());
+        VertexExecutionContext context =
+            VertexExecutionContext.createExecuteInContainers(proto.getExecuteInContainers());
         return context;
       } else {
         String taskScheduler = proto.hasTaskSchedulerName() ? proto.getTaskSchedulerName() : null;
         String containerLauncher =
             proto.hasContainerLauncherName() ? proto.getContainerLauncherName() : null;
         String taskComm = proto.hasTaskCommName() ? proto.getTaskCommName() : null;
-        Vertex.VertexExecutionContext context =
-            Vertex.VertexExecutionContext.create(taskScheduler, containerLauncher, taskComm);
+        VertexExecutionContext context =
+            VertexExecutionContext.create(taskScheduler, containerLauncher, taskComm);
         return context;
       }
     }
@@ -800,4 +801,40 @@ public class DagTypeConverters {
     return builder.build();
   }
 
+  public static AMPluginDescriptorProto convertServicePluginDescriptoToProto(
+      ServicePluginsDescriptor servicePluginsDescriptor) {
+    AMPluginDescriptorProto.Builder pluginDescriptorBuilder =
+        AMPluginDescriptorProto.newBuilder();
+    if (servicePluginsDescriptor != null) {
+
+      pluginDescriptorBuilder.setContainersEnabled(servicePluginsDescriptor.areContainersEnabled());
+      pluginDescriptorBuilder.setUberEnabled(servicePluginsDescriptor.isUberEnabled());
+
+      if (servicePluginsDescriptor.getTaskSchedulerDescriptors() != null &&
+          servicePluginsDescriptor.getTaskSchedulerDescriptors().length > 0) {
+        List<TezNamedEntityDescriptorProto> namedEntityProtos = DagTypeConverters.convertNamedEntityCollectionToProto(
+            servicePluginsDescriptor.getTaskSchedulerDescriptors());
+        pluginDescriptorBuilder.addAllTaskSchedulers(namedEntityProtos);
+      }
+
+      if (servicePluginsDescriptor.getContainerLauncherDescriptors() != null &&
+          servicePluginsDescriptor.getContainerLauncherDescriptors().length > 0) {
+        List<TezNamedEntityDescriptorProto> namedEntityProtos = DagTypeConverters.convertNamedEntityCollectionToProto(
+            servicePluginsDescriptor.getContainerLauncherDescriptors());
+        pluginDescriptorBuilder.addAllContainerLaunchers(namedEntityProtos);
+      }
+
+      if (servicePluginsDescriptor.getTaskCommunicatorDescriptors() != null &&
+          servicePluginsDescriptor.getTaskCommunicatorDescriptors().length > 0) {
+        List<TezNamedEntityDescriptorProto> namedEntityProtos = DagTypeConverters.convertNamedEntityCollectionToProto(
+            servicePluginsDescriptor.getTaskCommunicatorDescriptors());
+        pluginDescriptorBuilder.addAllTaskCommunicators(namedEntityProtos);
+      }
+
+    } else {
+      pluginDescriptorBuilder.setContainersEnabled(true).setUberEnabled(false);
+    }
+    return pluginDescriptorBuilder.build();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/0245665f/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
index 34124b2..8953ae1 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
@@ -511,6 +511,47 @@ public class Vertex {
           ", taskCommName='" + taskCommName + '\'' +
           '}';
     }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      VertexExecutionContext that = (VertexExecutionContext) o;
+
+      if (executeInAm != that.executeInAm) {
+        return false;
+      }
+      if (executeInContainers != that.executeInContainers) {
+        return false;
+      }
+      if (taskSchedulerName != null ? !taskSchedulerName.equals(that.taskSchedulerName) :
+          that.taskSchedulerName != null) {
+        return false;
+      }
+      if (containerLauncherName != null ?
+          !containerLauncherName.equals(that.containerLauncherName) :
+          that.containerLauncherName != null) {
+        return false;
+      }
+      return !(taskCommName != null ? !taskCommName.equals(that.taskCommName) :
+          that.taskCommName != null);
+
+    }
+
+    @Override
+    public int hashCode() {
+      int result = (executeInAm ? 1 : 0);
+      result = 31 * result + (executeInContainers ? 1 : 0);
+      result = 31 * result + (taskSchedulerName != null ? taskSchedulerName.hashCode() : 0);
+      result = 31 * result + (containerLauncherName != null ? containerLauncherName.hashCode() : 0);
+      result = 31 * result + (taskCommName != null ? taskCommName.hashCode() : 0);
+      return result;
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/0245665f/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java
index 8df102a..2e4fc46 100644
--- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java
@@ -46,6 +46,15 @@ public class ServicePluginsDescriptor {
     this.taskCommunicatorDescriptors = taskCommunicatorDescriptors;
   }
 
+  /**
+   * Create a service plugin descriptor with the provided plugins. Regular containers will also be enabled
+   * when using this method.
+   *
+   * @param taskSchedulerDescriptor the task scheduler plugin descriptors
+   * @param containerLauncherDescriptors the container launcher plugin descriptors
+   * @param taskCommunicatorDescriptors the task communicator plugin descriptors
+   * @return
+   */
   public static ServicePluginsDescriptor create(TaskSchedulerDescriptor[] taskSchedulerDescriptor,
                                                 ContainerLauncherDescriptor[] containerLauncherDescriptors,
                                                 TaskCommunicatorDescriptor[] taskCommunicatorDescriptors) {
@@ -53,6 +62,15 @@ public class ServicePluginsDescriptor {
         containerLauncherDescriptors, taskCommunicatorDescriptors);
   }
 
+  /**
+   * Create a service plugin descriptor with the provided plugins. Also allows specification of whether
+   * in-AM execution is enabled. Container execution is enabled by default.
+   * @param enableUber whether to enable execution in the AM or not
+   * @param taskSchedulerDescriptor the task scheduler plugin descriptors
+   * @param containerLauncherDescriptors the container launcher plugin descriptors
+   * @param taskCommunicatorDescriptors the task communicator plugin descriptors
+   * @return
+   */
   public static ServicePluginsDescriptor create(boolean enableUber,
                                                 TaskSchedulerDescriptor[] taskSchedulerDescriptor,
                                                 ContainerLauncherDescriptor[] containerLauncherDescriptors,
@@ -61,6 +79,17 @@ public class ServicePluginsDescriptor {
         containerLauncherDescriptors, taskCommunicatorDescriptors);
   }
 
+  /**
+   * Create a service plugin descriptor with the provided plugins. Also allows specification of whether
+   * container execution and in-AM execution will be enabled.
+   *
+   * @param enableContainers whether to enable execution in containers
+   * @param enableUber whether to enable execution in the AM or not
+   * @param taskSchedulerDescriptor the task scheduler plugin descriptors
+   * @param containerLauncherDescriptors the container launcher plugin descriptors
+   * @param taskCommunicatorDescriptors the task communicator plugin descriptors
+   * @return
+   */
   public static ServicePluginsDescriptor create(boolean enableContainers, boolean enableUber,
                                                 TaskSchedulerDescriptor[] taskSchedulerDescriptor,
                                                 ContainerLauncherDescriptor[] containerLauncherDescriptors,
@@ -69,6 +98,13 @@ public class ServicePluginsDescriptor {
         containerLauncherDescriptors, taskCommunicatorDescriptors);
   }
 
+  /**
+   * Create a service plugin descriptor which may have in-AM execution of tasks enabled. Container
+   * execution is enabled by default
+   *
+   * @param enableUber whether to enable execution in the AM or not
+   * @return
+   */
   public static ServicePluginsDescriptor create(boolean enableUber) {
     return new ServicePluginsDescriptor(true, enableUber, null, null, null);
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/0245665f/tez-api/src/main/proto/DAGApiRecords.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/DAGApiRecords.proto b/tez-api/src/main/proto/DAGApiRecords.proto
index ebe3259..193f7b8 100644
--- a/tez-api/src/main/proto/DAGApiRecords.proto
+++ b/tez-api/src/main/proto/DAGApiRecords.proto
@@ -180,7 +180,7 @@ message TezNamedEntityDescriptorProto {
 message AMPluginDescriptorProto {
   optional bool containers_enabled = 1 [default = true];
   optional bool uber_enabled = 2 [default = false];
-  repeated TezNamedEntityDescriptorProto task_scedulers = 3;
+  repeated TezNamedEntityDescriptorProto task_schedulers = 3;
   repeated TezNamedEntityDescriptorProto container_launchers = 4;
   repeated TezNamedEntityDescriptorProto task_communicators = 5;
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/0245665f/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
index 92d3cd2..66b273a 100644
--- a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
+++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
@@ -27,6 +27,11 @@ import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
 
 import static org.junit.Assert.fail;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.mock;
@@ -65,6 +70,7 @@ import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusRespo
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.ShutdownSessionRequestProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequestProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.TezAppMasterStatusProto;
+import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
 import org.hamcrest.CoreMatchers;
 import org.junit.Assert;
 import org.junit.Test;
@@ -165,11 +171,11 @@ public class TestTezClient {
       verify(client.mockYarnClient, times(1)).submitApplication(captor.capture());
       ApplicationSubmissionContext context = captor.getValue();
       Assert.assertEquals(3, context.getAMContainerSpec().getLocalResources().size());
-      Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
+      assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
           TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME));
-      Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
+      assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
           TezConstants.TEZ_PB_BINARY_CONF_NAME));
-      Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
+      assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
           lrName1));
     } else {
       verify(client.mockYarnClient, times(0)).submitApplication(captor.capture());
@@ -184,7 +190,7 @@ public class TestTezClient {
     DAG dag = DAG.create("DAG").addVertex(vertex).addTaskLocalFiles(lrDAG);
     DAGClient dagClient = client.submitDAG(dag);
         
-    Assert.assertTrue(dagClient.getExecutionContext().contains(client.mockAppId.toString()));
+    assertTrue(dagClient.getExecutionContext().contains(client.mockAppId.toString()));
     
     if (isSession) {
       verify(client.mockYarnClient, times(1)).submitApplication(captor.capture());
@@ -193,13 +199,13 @@ public class TestTezClient {
       verify(client.mockYarnClient, times(1)).submitApplication(captor.capture());
       ApplicationSubmissionContext context = captor.getValue();
       Assert.assertEquals(4, context.getAMContainerSpec().getLocalResources().size());
-      Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
+      assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
           TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME));
-      Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
+      assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
           TezConstants.TEZ_PB_BINARY_CONF_NAME));
-      Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
+      assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
           TezConstants.TEZ_PB_PLAN_BINARY_NAME));
-      Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
+      assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
           lrName1));
     }
     
@@ -223,7 +229,7 @@ public class TestTezClient {
     if (isSession) {
       // same app master
       verify(client.mockYarnClient, times(1)).submitApplication(captor.capture());
-      Assert.assertTrue(dagClient.getExecutionContext().contains(client.mockAppId.toString()));
+      assertTrue(dagClient.getExecutionContext().contains(client.mockAppId.toString()));
       // additional resource is sent
       ArgumentCaptor<SubmitDAGRequestProto> captor1 = ArgumentCaptor.forClass(SubmitDAGRequestProto.class);
       verify(client.sessionAmProxy, times(2)).submitDAG((RpcController)any(), captor1.capture());
@@ -232,20 +238,20 @@ public class TestTezClient {
       Assert.assertEquals(lrName2, proto.getAdditionalAmResources().getLocalResources(0).getName());
     } else {
       // new app master
-      Assert.assertTrue(dagClient.getExecutionContext().contains(appId2.toString()));
+      assertTrue(dagClient.getExecutionContext().contains(appId2.toString()));
       verify(client.mockYarnClient, times(2)).submitApplication(captor.capture());
       // additional resource is added
       ApplicationSubmissionContext context = captor.getValue();
       Assert.assertEquals(5, context.getAMContainerSpec().getLocalResources().size());
-      Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
+      assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
           TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME));
-      Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
+      assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
           TezConstants.TEZ_PB_BINARY_CONF_NAME));
-      Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
+      assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
           TezConstants.TEZ_PB_PLAN_BINARY_NAME));
-      Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
+      assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
           lrName1));
-      Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
+      assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
           lrName2));
     }
     
@@ -275,7 +281,7 @@ public class TestTezClient {
     ArgumentCaptor<SubmitDAGRequestProto> captor1 = ArgumentCaptor.forClass(SubmitDAGRequestProto.class);
     verify(client.sessionAmProxy, times(1)).submitDAG((RpcController)any(), captor1.capture());
     SubmitDAGRequestProto proto = captor1.getValue();
-    Assert.assertTrue(proto.getDAGPlan().getName().startsWith(TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX));
+    assertTrue(proto.getDAGPlan().getName().startsWith(TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX));
 
     client.stop();
   }
@@ -360,7 +366,7 @@ public class TestTezClient {
       client.waitTillReady();
       fail();
     } catch (SessionNotRunning e) {
-      Assert.assertTrue(e.getMessage().contains(msg));
+      assertTrue(e.getMessage().contains(msg));
     }
     client.stop();
   }
@@ -375,7 +381,7 @@ public class TestTezClient {
       client.waitTillReady();
       fail();
     } catch (SessionNotRunning e) {
-      Assert.assertTrue(e.getMessage().contains(TezClient.NO_CLUSTER_DIAGNOSTICS_MSG));
+      assertTrue(e.getMessage().contains(TezClient.NO_CLUSTER_DIAGNOSTICS_MSG));
     }
     client.stop();
   }
@@ -400,7 +406,7 @@ public class TestTezClient {
       client.submitDAG(dag);
       fail();
     } catch (SessionNotRunning e) {
-      Assert.assertTrue(e.getMessage().contains(msg));
+      assertTrue(e.getMessage().contains(msg));
     }
     client.stop();
   }
@@ -429,4 +435,71 @@ public class TestTezClient {
     }
   }
 
+  @Test(timeout = 5000)
+  public void testClientBuilder() {
+    TezConfiguration tezConfWitSession = new TezConfiguration();
+    tezConfWitSession.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true);
+
+    TezConfiguration tezConfNoSession = new TezConfiguration();
+    tezConfNoSession.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, false);
+
+    AMConfiguration amConf;
+    TezClient tezClient;
+    Credentials credentials = new Credentials();
+    Map<String, LocalResource> localResourceMap = new HashMap<>();
+    localResourceMap.put("testResource", mock(LocalResource.class));
+    ServicePluginsDescriptor servicePluginsDescriptor = ServicePluginsDescriptor.create(true);
+
+    // Session mode via conf
+    tezClient = TezClient.newBuilder("client", tezConfWitSession).build();
+    assertTrue(tezClient.isSession);
+    assertNull(tezClient.servicePluginsDescriptor);
+    assertNotNull(tezClient.apiVersionInfo);
+    amConf = tezClient.amConfig;
+    assertNotNull(amConf);
+    assertEquals(0, amConf.getAMLocalResources().size());
+    assertNull(amConf.getCredentials());
+    assertTrue(
+        amConf.getTezConfiguration().getBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, false));
+
+    // Non-Session mode via conf
+    tezClient = TezClient.newBuilder("client", tezConfNoSession).build();
+    assertFalse(tezClient.isSession);
+    assertNull(tezClient.servicePluginsDescriptor);
+    assertNotNull(tezClient.apiVersionInfo);
+    amConf = tezClient.amConfig;
+    assertNotNull(amConf);
+    assertEquals(0, amConf.getAMLocalResources().size());
+    assertNull(amConf.getCredentials());
+    assertFalse(
+        amConf.getTezConfiguration().getBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true));
+
+    // no-session via config. API explicit session.
+    tezClient = TezClient.newBuilder("client", tezConfNoSession).setIsSession(true).build();
+    assertTrue(tezClient.isSession);
+    assertNull(tezClient.servicePluginsDescriptor);
+    assertNotNull(tezClient.apiVersionInfo);
+    amConf = tezClient.amConfig;
+    assertNotNull(amConf);
+    assertEquals(0, amConf.getAMLocalResources().size());
+    assertNull(amConf.getCredentials());
+    assertTrue(
+        amConf.getTezConfiguration().getBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, false));
+
+    // Plugins, credentials, local resources
+    tezClient = TezClient.newBuilder("client", tezConfWitSession).setCredentials(credentials)
+        .setLocalResources(localResourceMap).setServicePluginDescriptor(servicePluginsDescriptor)
+        .build();
+    assertTrue(tezClient.isSession);
+    assertEquals(servicePluginsDescriptor, tezClient.servicePluginsDescriptor);
+    assertNotNull(tezClient.apiVersionInfo);
+    amConf = tezClient.amConfig;
+    assertNotNull(amConf);
+    assertEquals(1, amConf.getAMLocalResources().size());
+    assertEquals(localResourceMap, amConf.getAMLocalResources());
+    assertEquals(credentials, amConf.getCredentials());
+    assertTrue(
+        amConf.getTezConfiguration().getBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, false));
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/0245665f/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java
index 8946ef0..8f40bbd 100644
--- a/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java
+++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java
@@ -70,6 +70,7 @@ import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
 import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
+import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
 import org.junit.Assert;
 import org.junit.Test;
 /**
@@ -500,7 +501,8 @@ public class TestTezClientUtils {
     Assert.assertNotNull(javaOpts);
     Assert.assertTrue(javaOpts.contains("-D" + TezConstants.TEZ_ROOT_LOGGER_NAME + "=FOOBAR")
         && javaOpts.contains(TezConstants.TEZ_CONTAINER_LOG4J_PROPERTIES_FILE)
-        && javaOpts.contains("-Dlog4j.configuratorClass=org.apache.tez.common.TezLog4jConfigurator"));
+        &&
+        javaOpts.contains("-Dlog4j.configuratorClass=org.apache.tez.common.TezLog4jConfigurator"));
   }
 
   @Test (timeout = 5000)
@@ -677,6 +679,16 @@ public class TestTezClientUtils {
     Assert.assertTrue(resourceNames.contains("dir2-f.txt"));
   }
 
-  // TODO TEZ-2003 Add test to validate ServicePluginDescriptor propagation
+  @Test(timeout = 5000)
+  public void testServiceDescriptorSerializationForAM() {
+    Configuration conf = new Configuration(false);
+    ServicePluginsDescriptor servicePluginsDescriptor = ServicePluginsDescriptor.create(true);
+
+    ConfigurationProto confProto = TezClientUtils.createFinalConfProtoForApp(conf, null,
+        servicePluginsDescriptor);
+
+    assertTrue(confProto.hasAmPluginDescriptor());
+    assertTrue(confProto.getAmPluginDescriptor().getUberEnabled());
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/0245665f/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
index fccbb08..cd42109 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
@@ -37,11 +37,14 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.Vertex.VertexExecutionContext;
+import org.apache.tez.dag.api.records.DAGProtos;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
 import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration;
 import org.apache.tez.dag.api.records.DAGProtos.PlanTaskLocationHint;
 import org.apache.tez.dag.api.records.DAGProtos.PlanVertexType;
+import org.apache.tez.dag.api.records.DAGProtos.VertexExecutionContextProto;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.junit.Assert;
 import org.junit.Rule;
@@ -131,7 +134,8 @@ public class TestDAGPlan {
     EdgeManagerPluginDescriptor emDesc = edgeProperty.getEdgeManagerDescriptor();
     Assert.assertNotNull(emDesc);
     Assert.assertEquals("emClass", emDesc.getClassName());
-    Assert.assertTrue(Arrays.equals("emPayload".getBytes(), emDesc.getUserPayload().deepCopyAsArray()));
+    Assert.assertTrue(
+        Arrays.equals("emPayload".getBytes(), emDesc.getUserPayload().deepCopyAsArray()));
   }
 
   @Test(timeout = 5000)
@@ -311,4 +315,61 @@ public class TestDAGPlan {
     assertNotNull(fetchedCredentials.getToken(new Text("Token1")));
     assertNotNull(fetchedCredentials.getToken(new Text("Token2")));
   }
+
+  @Test(timeout = 5000)
+  public void testServiceDescriptorPropagation() {
+    DAG dag = DAG.create("testDag");
+    ProcessorDescriptor pd1 = ProcessorDescriptor.create("processor1").
+        setUserPayload(UserPayload.create(ByteBuffer.wrap("processor1Bytes".getBytes())));
+    ProcessorDescriptor pd2 = ProcessorDescriptor.create("processor2").
+        setUserPayload(UserPayload.create(ByteBuffer.wrap("processor2Bytes".getBytes())));
+
+    VertexExecutionContext defaultExecutionContext =
+        VertexExecutionContext.create("plugin", "plugin", "plugin");
+    VertexExecutionContext v1Context = VertexExecutionContext.createExecuteInAm(true);
+
+
+    Vertex v1 = Vertex.create("v1", pd1, 10, Resource.newInstance(1024, 1)).setExecutionContext(v1Context);
+    Vertex v2 = Vertex.create("v2", pd2, 1, Resource.newInstance(1024, 1));
+    v1.setTaskLaunchCmdOpts("").setTaskEnvironment(new HashMap<String, String>())
+        .addTaskLocalFiles(new HashMap<String, LocalResource>());
+    v2.setTaskLaunchCmdOpts("").setTaskEnvironment(new HashMap<String, String>())
+        .addTaskLocalFiles(new HashMap<String, LocalResource>());
+
+    InputDescriptor inputDescriptor = InputDescriptor.create("input").
+        setUserPayload(UserPayload.create(ByteBuffer.wrap("inputBytes".getBytes())));
+    OutputDescriptor outputDescriptor = OutputDescriptor.create("output").
+        setUserPayload(UserPayload.create(ByteBuffer.wrap("outputBytes".getBytes())));
+    Edge edge = Edge.create(v1, v2, EdgeProperty.create(
+        DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+        SchedulingType.SEQUENTIAL, outputDescriptor, inputDescriptor));
+
+    dag.addVertex(v1).addVertex(v2).addEdge(edge);
+    dag.setExecutionContext(defaultExecutionContext);
+
+    DAGPlan dagProto = dag.createDag(new TezConfiguration(), null, null, null, true);
+
+    assertEquals(2, dagProto.getVertexCount());
+    assertEquals(1, dagProto.getEdgeCount());
+
+    assertTrue(dagProto.hasDefaultExecutionContext());
+    VertexExecutionContextProto defaultContextProto = dagProto.getDefaultExecutionContext();
+    assertFalse(defaultContextProto.getExecuteInContainers());
+    assertFalse(defaultContextProto.getExecuteInAm());
+    assertEquals("plugin", defaultContextProto.getTaskSchedulerName());
+    assertEquals("plugin", defaultContextProto.getContainerLauncherName());
+    assertEquals("plugin", defaultContextProto.getTaskCommName());
+
+    VertexPlan v1Proto = dagProto.getVertex(0);
+    assertTrue(v1Proto.hasExecutionContext());
+    VertexExecutionContextProto v1ContextProto = v1Proto.getExecutionContext();
+    assertFalse(v1ContextProto.getExecuteInContainers());
+    assertTrue(v1ContextProto.getExecuteInAm());
+    assertFalse(v1ContextProto.hasTaskSchedulerName());
+    assertFalse(v1ContextProto.hasContainerLauncherName());
+    assertFalse(v1ContextProto.hasTaskCommName());
+
+    VertexPlan v2Proto = dagProto.getVertex(1);
+    assertFalse(v2Proto.hasExecutionContext());
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/0245665f/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
index 51b179a..e37f849 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
@@ -18,15 +18,32 @@
 
 package org.apache.tez.dag.api;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
 import java.io.IOException;
 
 import java.nio.ByteBuffer;
+import java.util.List;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.dag.api.Vertex.VertexExecutionContext;
+import org.apache.tez.dag.api.records.DAGProtos;
+import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
 import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
+import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto;
+import org.apache.tez.dag.api.records.DAGProtos.VertexExecutionContextProto;
+import org.apache.tez.serviceplugins.api.ContainerLauncher;
+import org.apache.tez.serviceplugins.api.ContainerLauncherDescriptor;
+import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
+import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor;
+import org.apache.tez.serviceplugins.api.TaskScheduler;
+import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -43,7 +60,7 @@ public class TestDagTypeConverters {
         DagTypeConverters.convertToDAGPlan(entityDescriptor);
     Assert.assertEquals(payload.getVersion(), proto.getTezUserPayload().getVersion());
     Assert.assertArrayEquals(payload.deepCopyAsArray(), proto.getTezUserPayload().getUserPayload().toByteArray());
-    Assert.assertTrue(proto.hasHistoryText());
+    assertTrue(proto.hasHistoryText());
     Assert.assertNotEquals(historytext, proto.getHistoryText());
     Assert.assertEquals(historytext, new String(
         TezCommonUtils.decompressByteStringToByteArray(proto.getHistoryText())));
@@ -89,4 +106,181 @@ public class TestDagTypeConverters {
     Assert.assertEquals(2311, lr2UrlDeserialized.getPort());
   }
 
+
+  @Test(timeout = 5000)
+  public void testVertexExecutionContextTranslation() {
+    VertexExecutionContext originalContext;
+    VertexExecutionContextProto contextProto;
+    VertexExecutionContext retrievedContext;
+
+
+    // Uber
+    originalContext = VertexExecutionContext.createExecuteInAm(true);
+    contextProto = DagTypeConverters.convertToProto(originalContext);
+    retrievedContext = DagTypeConverters.convertFromProto(contextProto);
+    assertEquals(originalContext, retrievedContext);
+
+    // Regular containers
+    originalContext = VertexExecutionContext.createExecuteInContainers(true);
+    contextProto = DagTypeConverters.convertToProto(originalContext);
+    retrievedContext = DagTypeConverters.convertFromProto(contextProto);
+    assertEquals(originalContext, retrievedContext);
+
+    // Custom
+    originalContext = VertexExecutionContext.create("plugin", "plugin", "plugin");
+    contextProto = DagTypeConverters.convertToProto(originalContext);
+    retrievedContext = DagTypeConverters.convertFromProto(contextProto);
+    assertEquals(originalContext, retrievedContext);
+  }
+
+
+  static final String testScheduler = "testScheduler";
+  static final String testLauncher = "testLauncher";
+  static final String testComm = "testComm";
+  static final String classSuffix = "_class";
+
+  @Test(timeout = 5000)
+  public void testServiceDescriptorTranslation() {
+
+
+    TaskSchedulerDescriptor[] taskSchedulers;
+    ContainerLauncherDescriptor[] containerLaunchers;
+    TaskCommunicatorDescriptor[] taskComms;
+
+    ServicePluginsDescriptor servicePluginsDescriptor;
+    AMPluginDescriptorProto proto;
+
+    // Uber-execution
+    servicePluginsDescriptor = ServicePluginsDescriptor.create(true);
+    proto = DagTypeConverters.convertServicePluginDescriptoToProto(servicePluginsDescriptor);
+    assertTrue(proto.hasUberEnabled());
+    assertTrue(proto.hasContainersEnabled());
+    assertTrue(proto.getUberEnabled());
+    assertTrue(proto.getContainersEnabled());
+    assertEquals(0, proto.getTaskSchedulersCount());
+    assertEquals(0, proto.getContainerLaunchersCount());
+    assertEquals(0, proto.getTaskCommunicatorsCount());
+
+    // Single plugin set specified. One with a payload.
+    taskSchedulers = createTaskScheduelrs(1, false);
+    containerLaunchers = createContainerLaunchers(1, false);
+    taskComms = createTaskCommunicators(1, true);
+
+    servicePluginsDescriptor = ServicePluginsDescriptor.create(taskSchedulers, containerLaunchers,
+        taskComms);
+    proto = DagTypeConverters.convertServicePluginDescriptoToProto(servicePluginsDescriptor);
+    assertTrue(proto.hasUberEnabled());
+    assertTrue(proto.hasContainersEnabled());
+    assertFalse(proto.getUberEnabled());
+    assertTrue(proto.getContainersEnabled());
+    verifyPlugins(proto.getTaskSchedulersList(), 1, testScheduler, false);
+    verifyPlugins(proto.getContainerLaunchersList(), 1, testLauncher, false);
+    verifyPlugins(proto.getTaskCommunicatorsList(), 1, testComm, true);
+
+
+    // Multiple plugin set specified. All with a payload
+    taskSchedulers = createTaskScheduelrs(3, true);
+    containerLaunchers = createContainerLaunchers(3, true);
+    taskComms = createTaskCommunicators(3, true);
+
+    servicePluginsDescriptor = ServicePluginsDescriptor.create(taskSchedulers, containerLaunchers,
+        taskComms);
+    proto = DagTypeConverters.convertServicePluginDescriptoToProto(servicePluginsDescriptor);
+    assertTrue(proto.hasUberEnabled());
+    assertTrue(proto.hasContainersEnabled());
+    assertFalse(proto.getUberEnabled());
+    assertTrue(proto.getContainersEnabled());
+    verifyPlugins(proto.getTaskSchedulersList(), 3, testScheduler, true);
+    verifyPlugins(proto.getContainerLaunchersList(), 3, testLauncher, true);
+    verifyPlugins(proto.getTaskCommunicatorsList(), 3, testComm, true);
+
+    // Single plugin set specified. One with a payload. No container execution. Uber enabled.
+    taskSchedulers = createTaskScheduelrs(1, false);
+    containerLaunchers = createContainerLaunchers(1, false);
+    taskComms = createTaskCommunicators(1, true);
+
+    servicePluginsDescriptor = ServicePluginsDescriptor.create(false, true, taskSchedulers, containerLaunchers,
+        taskComms);
+    proto = DagTypeConverters.convertServicePluginDescriptoToProto(servicePluginsDescriptor);
+    assertTrue(proto.hasUberEnabled());
+    assertTrue(proto.hasContainersEnabled());
+    assertTrue(proto.getUberEnabled());
+    assertFalse(proto.getContainersEnabled());
+    verifyPlugins(proto.getTaskSchedulersList(), 1, testScheduler, false);
+    verifyPlugins(proto.getContainerLaunchersList(), 1, testLauncher, false);
+    verifyPlugins(proto.getTaskCommunicatorsList(), 1, testComm, true);
+  }
+
+  private void verifyPlugins(List<TezNamedEntityDescriptorProto> entities, int expectedCount,
+                             String baseString, boolean hasPayload) {
+    assertEquals(expectedCount, entities.size());
+    for (int i = 0; i < expectedCount; i++) {
+      assertEquals(indexedEntity(baseString, i), entities.get(i).getName());
+      TezEntityDescriptorProto subEntityProto = entities.get(i).getEntityDescriptor();
+      assertEquals(append(indexedEntity(baseString, i), classSuffix),
+          subEntityProto.getClassName());
+      assertEquals(hasPayload, subEntityProto.hasTezUserPayload());
+      if (hasPayload) {
+        UserPayload userPayload =
+            UserPayload
+                .create(subEntityProto.getTezUserPayload().getUserPayload().asReadOnlyByteBuffer(),
+                    subEntityProto.getTezUserPayload().getVersion());
+        ByteBuffer bb = userPayload.getPayload();
+        assertNotNull(bb);
+        assertEquals(i, bb.getInt());
+      }
+    }
+  }
+
+  private TaskSchedulerDescriptor[] createTaskScheduelrs(int count, boolean withUserPayload) {
+    TaskSchedulerDescriptor[] descriptors = new TaskSchedulerDescriptor[count];
+    for (int i = 0; i < count; i++) {
+      descriptors[i] = TaskSchedulerDescriptor.create(indexedEntity(testScheduler, i),
+          append(indexedEntity(testScheduler, i), classSuffix));
+      if (withUserPayload) {
+        descriptors[i].setUserPayload(createPayload(i));
+      }
+    }
+    return descriptors;
+  }
+
+  private ContainerLauncherDescriptor[] createContainerLaunchers(int count,
+                                                                 boolean withUserPayload) {
+    ContainerLauncherDescriptor[] descriptors = new ContainerLauncherDescriptor[count];
+    for (int i = 0; i < count; i++) {
+      descriptors[i] = ContainerLauncherDescriptor.create(indexedEntity(testLauncher, i),
+          append(indexedEntity(testLauncher, i), classSuffix));
+      if (withUserPayload) {
+        descriptors[i].setUserPayload(createPayload(i));
+      }
+    }
+    return descriptors;
+  }
+
+  private TaskCommunicatorDescriptor[] createTaskCommunicators(int count, boolean withUserPayload) {
+    TaskCommunicatorDescriptor[] descriptors = new TaskCommunicatorDescriptor[count];
+    for (int i = 0; i < count; i++) {
+      descriptors[i] = TaskCommunicatorDescriptor.create(indexedEntity(testComm, i),
+          append(indexedEntity(testComm, i), classSuffix));
+      if (withUserPayload) {
+        descriptors[i].setUserPayload(createPayload(i));
+      }
+    }
+    return descriptors;
+  }
+
+  private static UserPayload createPayload(int i) {
+    ByteBuffer bb = ByteBuffer.allocate(4);
+    bb.putInt(0, i);
+    UserPayload payload = UserPayload.create(bb);
+    return payload;
+  }
+
+  private String indexedEntity(String name, int index) {
+    return name + index;
+  }
+
+  private String append(String s1, String s2) {
+    return s1 + s2;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/0245665f/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 4128841..9b16a90 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -409,9 +409,9 @@ public class DAGAppMaster extends AbstractService {
     }
 
     taskSchedulerDescriptors = parsePlugin(taskSchedulers,
-        (amPluginDescriptorProto == null || amPluginDescriptorProto.getTaskScedulersCount() == 0 ?
+        (amPluginDescriptorProto == null || amPluginDescriptorProto.getTaskSchedulersCount() == 0 ?
             null :
-            amPluginDescriptorProto.getTaskScedulersList()),
+            amPluginDescriptorProto.getTaskSchedulersList()),
         tezYarnEnabled, uberEnabled);
 
     containerLauncherDescriptors = parsePlugin(containerLaunchers,