You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/06 11:26:39 UTC
[47/51] [abbrv] tez git commit: TEZ-2657. Add tests for client side
changes - specifying plugins, etc. (sseth)
TEZ-2657. Add tests for client side changes - specifying plugins, etc. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/306020d2
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/306020d2
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/306020d2
Branch: refs/heads/TEZ-2003
Commit: 306020d20ad4c23bb9854e9e7fc029f2096948c6
Parents: eb82ca2
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Jul 29 18:26:01 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Aug 6 01:26:58 2015 -0700
----------------------------------------------------------------------
TEZ-2003-CHANGES.txt | 1 +
.../java/org/apache/tez/client/TezClient.java | 15 +-
.../org/apache/tez/client/TezClientUtils.java | 38 +---
.../apache/tez/dag/api/DagTypeConverters.java | 67 +++++--
.../java/org/apache/tez/dag/api/Vertex.java | 41 ++++
.../api/ServicePluginsDescriptor.java | 36 ++++
tez-api/src/main/proto/DAGApiRecords.proto | 2 +-
.../org/apache/tez/client/TestTezClient.java | 113 +++++++++--
.../apache/tez/client/TestTezClientUtils.java | 16 +-
.../org/apache/tez/dag/api/TestDAGPlan.java | 63 +++++-
.../tez/dag/api/TestDagTypeConverters.java | 196 ++++++++++++++++++-
.../org/apache/tez/dag/app/DAGAppMaster.java | 4 +-
12 files changed, 508 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/306020d2/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index 9d72d92..9b3967a 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -40,5 +40,6 @@ ALL CHANGES:
TEZ-2652. Cleanup the way services are specified for an AM and vertices.
TEZ-2653. Change service contexts to expose a user specified payload instead of the AM configuration.
TEZ-2441. Add tests for TezTaskRunner2.
+ TEZ-2657. Add tests for client side changes - specifying plugins, etc.
INCOMPATIBLE CHANGES:
http://git-wip-us.apache.org/repos/asf/tez/blob/306020d2/tez-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index f961291..7778ef3 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -94,13 +94,16 @@ public class TezClient {
@VisibleForTesting
static final String NO_CLUSTER_DIAGNOSTICS_MSG = "No cluster diagnostics found.";
- private final String clientName;
+ @VisibleForTesting
+ final String clientName;
private ApplicationId sessionAppId;
private ApplicationId lastSubmittedAppId;
- private AMConfiguration amConfig;
+ @VisibleForTesting
+ final AMConfiguration amConfig;
private FrameworkClient frameworkClient;
private String diagnostics;
- private boolean isSession;
+ @VisibleForTesting
+ final boolean isSession;
private boolean sessionStarted = false;
private boolean sessionStopped = false;
/** Tokens which will be required for all DAGs submitted to this session. */
@@ -112,8 +115,10 @@ public class TezClient {
private JobTokenSecretManager jobTokenSecretManager =
new JobTokenSecretManager();
private final Map<String, LocalResource> additionalLocalResources = Maps.newHashMap();
- private final TezApiVersionInfo apiVersionInfo;
- private final ServicePluginsDescriptor servicePluginsDescriptor;
+ @VisibleForTesting
+ final TezApiVersionInfo apiVersionInfo;
+ @VisibleForTesting
+ final ServicePluginsDescriptor servicePluginsDescriptor;
private HistoryACLPolicyManager historyACLPolicyManager;
private int preWarmDAGCounter = 0;
http://git-wip-us.apache.org/repos/asf/tez/blob/306020d2/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index 9cf1f3f..6086fa1 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -39,9 +39,7 @@ import java.util.Map.Entry;
import com.google.common.base.Strings;
import org.apache.commons.lang.StringUtils;
-import org.apache.tez.dag.api.NamedEntityDescriptor;
import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
-import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto;
import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -778,47 +776,13 @@ public class TezClientUtils {
}
AMPluginDescriptorProto pluginDescriptorProto =
- createAMServicePluginDescriptorProto(servicePluginsDescriptor);
+ DagTypeConverters.convertServicePluginDescriptoToProto(servicePluginsDescriptor);
builder.setAmPluginDescriptor(pluginDescriptorProto);
return builder.build();
}
- static AMPluginDescriptorProto createAMServicePluginDescriptorProto(
- ServicePluginsDescriptor servicePluginsDescriptor) {
- AMPluginDescriptorProto.Builder pluginDescriptorBuilder =
- AMPluginDescriptorProto.newBuilder();
- if (servicePluginsDescriptor != null) {
- pluginDescriptorBuilder.setContainersEnabled(servicePluginsDescriptor.areContainersEnabled());
- pluginDescriptorBuilder.setUberEnabled(servicePluginsDescriptor.isUberEnabled());
-
- if (servicePluginsDescriptor.getTaskSchedulerDescriptors() != null &&
- servicePluginsDescriptor.getTaskSchedulerDescriptors().length > 0) {
- List<TezNamedEntityDescriptorProto> namedEntityProtos = DagTypeConverters.convertNamedEntityCollectionToProto(
- servicePluginsDescriptor.getTaskSchedulerDescriptors());
- pluginDescriptorBuilder.addAllTaskScedulers(namedEntityProtos);
- }
-
- if (servicePluginsDescriptor.getContainerLauncherDescriptors() != null &&
- servicePluginsDescriptor.getContainerLauncherDescriptors().length > 0) {
- List<TezNamedEntityDescriptorProto> namedEntityProtos = DagTypeConverters.convertNamedEntityCollectionToProto(
- servicePluginsDescriptor.getContainerLauncherDescriptors());
- pluginDescriptorBuilder.addAllContainerLaunchers(namedEntityProtos);
- }
-
- if (servicePluginsDescriptor.getTaskCommunicatorDescriptors() != null &&
- servicePluginsDescriptor.getTaskCommunicatorDescriptors().length > 0) {
- List<TezNamedEntityDescriptorProto> namedEntityProtos = DagTypeConverters.convertNamedEntityCollectionToProto(
- servicePluginsDescriptor.getTaskCommunicatorDescriptors());
- pluginDescriptorBuilder.addAllTaskCommunicators(namedEntityProtos);
- }
-
- } else {
- pluginDescriptorBuilder.setContainersEnabled(true).setUberEnabled(false);
- }
- return pluginDescriptorBuilder.build();
- }
/**
* Helper function to create a YARN LocalResource
http://git-wip-us.apache.org/repos/asf/tez/blob/306020d2/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
index 2e0d417..61e4d33 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
@@ -52,9 +52,11 @@ import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.Vertex.VertexExecutionContext;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.TezAppMasterStatusProto;
import org.apache.tez.dag.api.records.DAGProtos;
+import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeDataMovementType;
@@ -74,14 +76,13 @@ import org.apache.tez.dag.api.records.DAGProtos.TezCounterProto;
import org.apache.tez.dag.api.records.DAGProtos.TezCountersProto;
import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto;
+import org.apache.tez.dag.api.records.DAGProtos.VertexExecutionContextProto;
import org.apache.tez.dag.api.records.DAGProtos.VertexLocationHintProto;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import com.google.protobuf.ByteString.Output;
-import org.apache.tez.serviceplugins.api.ContainerLauncherDescriptor;
-import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor;
-import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor;
+import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
@Private
public class DagTypeConverters {
@@ -732,13 +733,13 @@ public class DagTypeConverters {
return payload.getPayload();
}
- public static DAGProtos.VertexExecutionContextProto convertToProto(
- Vertex.VertexExecutionContext context) {
+ public static VertexExecutionContextProto convertToProto(
+ VertexExecutionContext context) {
if (context == null) {
return null;
} else {
- DAGProtos.VertexExecutionContextProto.Builder builder =
- DAGProtos.VertexExecutionContextProto.newBuilder();
+ VertexExecutionContextProto.Builder builder =
+ VertexExecutionContextProto.newBuilder();
builder.setExecuteInAm(context.shouldExecuteInAm());
builder.setExecuteInContainers(context.shouldExecuteInContainers());
if (context.getTaskSchedulerName() != null) {
@@ -754,26 +755,26 @@ public class DagTypeConverters {
}
}
- public static Vertex.VertexExecutionContext convertFromProto(
- DAGProtos.VertexExecutionContextProto proto) {
+ public static VertexExecutionContext convertFromProto(
+ VertexExecutionContextProto proto) {
if (proto == null) {
return null;
} else {
if (proto.getExecuteInAm()) {
- Vertex.VertexExecutionContext context =
- Vertex.VertexExecutionContext.createExecuteInAm(proto.getExecuteInAm());
+ VertexExecutionContext context =
+ VertexExecutionContext.createExecuteInAm(proto.getExecuteInAm());
return context;
} else if (proto.getExecuteInContainers()) {
- Vertex.VertexExecutionContext context =
- Vertex.VertexExecutionContext.createExecuteInContainers(proto.getExecuteInContainers());
+ VertexExecutionContext context =
+ VertexExecutionContext.createExecuteInContainers(proto.getExecuteInContainers());
return context;
} else {
String taskScheduler = proto.hasTaskSchedulerName() ? proto.getTaskSchedulerName() : null;
String containerLauncher =
proto.hasContainerLauncherName() ? proto.getContainerLauncherName() : null;
String taskComm = proto.hasTaskCommName() ? proto.getTaskCommName() : null;
- Vertex.VertexExecutionContext context =
- Vertex.VertexExecutionContext.create(taskScheduler, containerLauncher, taskComm);
+ VertexExecutionContext context =
+ VertexExecutionContext.create(taskScheduler, containerLauncher, taskComm);
return context;
}
}
@@ -800,4 +801,40 @@ public class DagTypeConverters {
return builder.build();
}
+ public static AMPluginDescriptorProto convertServicePluginDescriptoToProto(
+ ServicePluginsDescriptor servicePluginsDescriptor) {
+ AMPluginDescriptorProto.Builder pluginDescriptorBuilder =
+ AMPluginDescriptorProto.newBuilder();
+ if (servicePluginsDescriptor != null) {
+
+ pluginDescriptorBuilder.setContainersEnabled(servicePluginsDescriptor.areContainersEnabled());
+ pluginDescriptorBuilder.setUberEnabled(servicePluginsDescriptor.isUberEnabled());
+
+ if (servicePluginsDescriptor.getTaskSchedulerDescriptors() != null &&
+ servicePluginsDescriptor.getTaskSchedulerDescriptors().length > 0) {
+ List<TezNamedEntityDescriptorProto> namedEntityProtos = DagTypeConverters.convertNamedEntityCollectionToProto(
+ servicePluginsDescriptor.getTaskSchedulerDescriptors());
+ pluginDescriptorBuilder.addAllTaskSchedulers(namedEntityProtos);
+ }
+
+ if (servicePluginsDescriptor.getContainerLauncherDescriptors() != null &&
+ servicePluginsDescriptor.getContainerLauncherDescriptors().length > 0) {
+ List<TezNamedEntityDescriptorProto> namedEntityProtos = DagTypeConverters.convertNamedEntityCollectionToProto(
+ servicePluginsDescriptor.getContainerLauncherDescriptors());
+ pluginDescriptorBuilder.addAllContainerLaunchers(namedEntityProtos);
+ }
+
+ if (servicePluginsDescriptor.getTaskCommunicatorDescriptors() != null &&
+ servicePluginsDescriptor.getTaskCommunicatorDescriptors().length > 0) {
+ List<TezNamedEntityDescriptorProto> namedEntityProtos = DagTypeConverters.convertNamedEntityCollectionToProto(
+ servicePluginsDescriptor.getTaskCommunicatorDescriptors());
+ pluginDescriptorBuilder.addAllTaskCommunicators(namedEntityProtos);
+ }
+
+ } else {
+ pluginDescriptorBuilder.setContainersEnabled(true).setUberEnabled(false);
+ }
+ return pluginDescriptorBuilder.build();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/306020d2/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
index 34124b2..8953ae1 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
@@ -511,6 +511,47 @@ public class Vertex {
", taskCommName='" + taskCommName + '\'' +
'}';
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ VertexExecutionContext that = (VertexExecutionContext) o;
+
+ if (executeInAm != that.executeInAm) {
+ return false;
+ }
+ if (executeInContainers != that.executeInContainers) {
+ return false;
+ }
+ if (taskSchedulerName != null ? !taskSchedulerName.equals(that.taskSchedulerName) :
+ that.taskSchedulerName != null) {
+ return false;
+ }
+ if (containerLauncherName != null ?
+ !containerLauncherName.equals(that.containerLauncherName) :
+ that.containerLauncherName != null) {
+ return false;
+ }
+ return !(taskCommName != null ? !taskCommName.equals(that.taskCommName) :
+ that.taskCommName != null);
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = (executeInAm ? 1 : 0);
+ result = 31 * result + (executeInContainers ? 1 : 0);
+ result = 31 * result + (taskSchedulerName != null ? taskSchedulerName.hashCode() : 0);
+ result = 31 * result + (containerLauncherName != null ? containerLauncherName.hashCode() : 0);
+ result = 31 * result + (taskCommName != null ? taskCommName.hashCode() : 0);
+ return result;
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/306020d2/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java
index 8df102a..2e4fc46 100644
--- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java
@@ -46,6 +46,15 @@ public class ServicePluginsDescriptor {
this.taskCommunicatorDescriptors = taskCommunicatorDescriptors;
}
+ /**
+ * Create a service plugin descriptor with the provided plugins. Regular containers will also be enabled
+ * when using this method.
+ *
+ * @param taskSchedulerDescriptor the task scheduler plugin descriptors
+ * @param containerLauncherDescriptors the container launcher plugin descriptors
+ * @param taskCommunicatorDescriptors the task communicator plugin descriptors
+ * @return
+ */
public static ServicePluginsDescriptor create(TaskSchedulerDescriptor[] taskSchedulerDescriptor,
ContainerLauncherDescriptor[] containerLauncherDescriptors,
TaskCommunicatorDescriptor[] taskCommunicatorDescriptors) {
@@ -53,6 +62,15 @@ public class ServicePluginsDescriptor {
containerLauncherDescriptors, taskCommunicatorDescriptors);
}
+ /**
+ * Create a service plugin descriptor with the provided plugins. Also allows specification of whether
+ * in-AM execution is enabled. Container execution is enabled by default.
+ * @param enableUber whether to enable execution in the AM or not
+ * @param taskSchedulerDescriptor the task scheduler plugin descriptors
+ * @param containerLauncherDescriptors the container launcher plugin descriptors
+ * @param taskCommunicatorDescriptors the task communicator plugin descriptors
+ * @return
+ */
public static ServicePluginsDescriptor create(boolean enableUber,
TaskSchedulerDescriptor[] taskSchedulerDescriptor,
ContainerLauncherDescriptor[] containerLauncherDescriptors,
@@ -61,6 +79,17 @@ public class ServicePluginsDescriptor {
containerLauncherDescriptors, taskCommunicatorDescriptors);
}
+ /**
+ * Create a service plugin descriptor with the provided plugins. Also allows specification of whether
+ * container execution and in-AM execution will be enabled.
+ *
+ * @param enableContainers whether to enable execution in containers
+ * @param enableUber whether to enable execution in the AM or not
+ * @param taskSchedulerDescriptor the task scheduler plugin descriptors
+ * @param containerLauncherDescriptors the container launcher plugin descriptors
+ * @param taskCommunicatorDescriptors the task communicator plugin descriptors
+ * @return
+ */
public static ServicePluginsDescriptor create(boolean enableContainers, boolean enableUber,
TaskSchedulerDescriptor[] taskSchedulerDescriptor,
ContainerLauncherDescriptor[] containerLauncherDescriptors,
@@ -69,6 +98,13 @@ public class ServicePluginsDescriptor {
containerLauncherDescriptors, taskCommunicatorDescriptors);
}
+ /**
+ * Create a service plugin descriptor which may have in-AM execution of tasks enabled. Container
+ * execution is enabled by default
+ *
+ * @param enableUber whether to enable execution in the AM or not
+ * @return
+ */
public static ServicePluginsDescriptor create(boolean enableUber) {
return new ServicePluginsDescriptor(true, enableUber, null, null, null);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/306020d2/tez-api/src/main/proto/DAGApiRecords.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/DAGApiRecords.proto b/tez-api/src/main/proto/DAGApiRecords.proto
index ebe3259..193f7b8 100644
--- a/tez-api/src/main/proto/DAGApiRecords.proto
+++ b/tez-api/src/main/proto/DAGApiRecords.proto
@@ -180,7 +180,7 @@ message TezNamedEntityDescriptorProto {
message AMPluginDescriptorProto {
optional bool containers_enabled = 1 [default = true];
optional bool uber_enabled = 2 [default = false];
- repeated TezNamedEntityDescriptorProto task_scedulers = 3;
+ repeated TezNamedEntityDescriptorProto task_schedulers = 3;
repeated TezNamedEntityDescriptorProto container_launchers = 4;
repeated TezNamedEntityDescriptorProto task_communicators = 5;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/306020d2/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
index dc0fbb1..7a642e6 100644
--- a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
+++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
@@ -26,6 +26,11 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
@@ -61,6 +66,7 @@ import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusRespo
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.ShutdownSessionRequestProto;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequestProto;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.TezAppMasterStatusProto;
+import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Test;
@@ -153,11 +159,11 @@ public class TestTezClient {
verify(client.mockYarnClient, times(1)).submitApplication(captor.capture());
ApplicationSubmissionContext context = captor.getValue();
Assert.assertEquals(3, context.getAMContainerSpec().getLocalResources().size());
- Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
+ assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME));
- Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
+ assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
TezConstants.TEZ_PB_BINARY_CONF_NAME));
- Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
+ assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
lrName1));
} else {
verify(client.mockYarnClient, times(0)).submitApplication(captor.capture());
@@ -172,7 +178,7 @@ public class TestTezClient {
DAG dag = DAG.create("DAG").addVertex(vertex).addTaskLocalFiles(lrDAG);
DAGClient dagClient = client.submitDAG(dag);
- Assert.assertTrue(dagClient.getExecutionContext().contains(client.mockAppId.toString()));
+ assertTrue(dagClient.getExecutionContext().contains(client.mockAppId.toString()));
if (isSession) {
verify(client.mockYarnClient, times(1)).submitApplication(captor.capture());
@@ -181,13 +187,13 @@ public class TestTezClient {
verify(client.mockYarnClient, times(1)).submitApplication(captor.capture());
ApplicationSubmissionContext context = captor.getValue();
Assert.assertEquals(4, context.getAMContainerSpec().getLocalResources().size());
- Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
+ assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME));
- Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
+ assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
TezConstants.TEZ_PB_BINARY_CONF_NAME));
- Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
+ assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
TezConstants.TEZ_PB_PLAN_BINARY_NAME));
- Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
+ assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
lrName1));
}
@@ -211,7 +217,7 @@ public class TestTezClient {
if (isSession) {
// same app master
verify(client.mockYarnClient, times(1)).submitApplication(captor.capture());
- Assert.assertTrue(dagClient.getExecutionContext().contains(client.mockAppId.toString()));
+ assertTrue(dagClient.getExecutionContext().contains(client.mockAppId.toString()));
// additional resource is sent
ArgumentCaptor<SubmitDAGRequestProto> captor1 = ArgumentCaptor.forClass(SubmitDAGRequestProto.class);
verify(client.sessionAmProxy, times(2)).submitDAG((RpcController)any(), captor1.capture());
@@ -220,20 +226,20 @@ public class TestTezClient {
Assert.assertEquals(lrName2, proto.getAdditionalAmResources().getLocalResources(0).getName());
} else {
// new app master
- Assert.assertTrue(dagClient.getExecutionContext().contains(appId2.toString()));
+ assertTrue(dagClient.getExecutionContext().contains(appId2.toString()));
verify(client.mockYarnClient, times(2)).submitApplication(captor.capture());
// additional resource is added
ApplicationSubmissionContext context = captor.getValue();
Assert.assertEquals(5, context.getAMContainerSpec().getLocalResources().size());
- Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
+ assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME));
- Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
+ assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
TezConstants.TEZ_PB_BINARY_CONF_NAME));
- Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
+ assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
TezConstants.TEZ_PB_PLAN_BINARY_NAME));
- Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
+ assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
lrName1));
- Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
+ assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
lrName2));
}
@@ -263,7 +269,7 @@ public class TestTezClient {
ArgumentCaptor<SubmitDAGRequestProto> captor1 = ArgumentCaptor.forClass(SubmitDAGRequestProto.class);
verify(client.sessionAmProxy, times(1)).submitDAG((RpcController)any(), captor1.capture());
SubmitDAGRequestProto proto = captor1.getValue();
- Assert.assertTrue(proto.getDAGPlan().getName().startsWith(TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX));
+ assertTrue(proto.getDAGPlan().getName().startsWith(TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX));
client.stop();
}
@@ -330,7 +336,7 @@ public class TestTezClient {
thread.join(250);
thread.interrupt();
thread.join();
- Assert.assertThat(exceptionReference.get(),CoreMatchers. instanceOf(InterruptedException.class));
+ Assert.assertThat(exceptionReference.get(), CoreMatchers.instanceOf(InterruptedException.class));
client.stop();
}
@@ -347,7 +353,7 @@ public class TestTezClient {
client.waitTillReady();
Assert.fail();
} catch (SessionNotRunning e) {
- Assert.assertTrue(e.getMessage().contains(msg));
+ assertTrue(e.getMessage().contains(msg));
}
client.stop();
}
@@ -362,7 +368,7 @@ public class TestTezClient {
client.waitTillReady();
Assert.fail();
} catch (SessionNotRunning e) {
- Assert.assertTrue(e.getMessage().contains(TezClient.NO_CLUSTER_DIAGNOSTICS_MSG));
+ assertTrue(e.getMessage().contains(TezClient.NO_CLUSTER_DIAGNOSTICS_MSG));
}
client.stop();
}
@@ -387,9 +393,76 @@ public class TestTezClient {
client.submitDAG(dag);
Assert.fail();
} catch (SessionNotRunning e) {
- Assert.assertTrue(e.getMessage().contains(msg));
+ assertTrue(e.getMessage().contains(msg));
}
client.stop();
}
+ @Test(timeout = 5000)
+ public void testClientBuilder() {
+ TezConfiguration tezConfWitSession = new TezConfiguration();
+ tezConfWitSession.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true);
+
+ TezConfiguration tezConfNoSession = new TezConfiguration();
+ tezConfNoSession.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, false);
+
+ AMConfiguration amConf;
+ TezClient tezClient;
+ Credentials credentials = new Credentials();
+ Map<String, LocalResource> localResourceMap = new HashMap<>();
+ localResourceMap.put("testResource", mock(LocalResource.class));
+ ServicePluginsDescriptor servicePluginsDescriptor = ServicePluginsDescriptor.create(true);
+
+ // Session mode via conf
+ tezClient = TezClient.newBuilder("client", tezConfWitSession).build();
+ assertTrue(tezClient.isSession);
+ assertNull(tezClient.servicePluginsDescriptor);
+ assertNotNull(tezClient.apiVersionInfo);
+ amConf = tezClient.amConfig;
+ assertNotNull(amConf);
+ assertEquals(0, amConf.getAMLocalResources().size());
+ assertNull(amConf.getCredentials());
+ assertTrue(
+ amConf.getTezConfiguration().getBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, false));
+
+ // Non-Session mode via conf
+ tezClient = TezClient.newBuilder("client", tezConfNoSession).build();
+ assertFalse(tezClient.isSession);
+ assertNull(tezClient.servicePluginsDescriptor);
+ assertNotNull(tezClient.apiVersionInfo);
+ amConf = tezClient.amConfig;
+ assertNotNull(amConf);
+ assertEquals(0, amConf.getAMLocalResources().size());
+ assertNull(amConf.getCredentials());
+ assertFalse(
+ amConf.getTezConfiguration().getBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true));
+
+ // no-session via config. API explicit session.
+ tezClient = TezClient.newBuilder("client", tezConfNoSession).setIsSession(true).build();
+ assertTrue(tezClient.isSession);
+ assertNull(tezClient.servicePluginsDescriptor);
+ assertNotNull(tezClient.apiVersionInfo);
+ amConf = tezClient.amConfig;
+ assertNotNull(amConf);
+ assertEquals(0, amConf.getAMLocalResources().size());
+ assertNull(amConf.getCredentials());
+ assertTrue(
+ amConf.getTezConfiguration().getBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, false));
+
+ // Plugins, credentials, local resources
+ tezClient = TezClient.newBuilder("client", tezConfWitSession).setCredentials(credentials)
+ .setLocalResources(localResourceMap).setServicePluginDescriptor(servicePluginsDescriptor)
+ .build();
+ assertTrue(tezClient.isSession);
+ assertEquals(servicePluginsDescriptor, tezClient.servicePluginsDescriptor);
+ assertNotNull(tezClient.apiVersionInfo);
+ amConf = tezClient.amConfig;
+ assertNotNull(amConf);
+ assertEquals(1, amConf.getAMLocalResources().size());
+ assertEquals(localResourceMap, amConf.getAMLocalResources());
+ assertEquals(credentials, amConf.getCredentials());
+ assertTrue(
+ amConf.getTezConfiguration().getBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, false));
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/306020d2/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java
index 8946ef0..8f40bbd 100644
--- a/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java
+++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java
@@ -70,6 +70,7 @@ import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
+import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
import org.junit.Assert;
import org.junit.Test;
/**
@@ -500,7 +501,8 @@ public class TestTezClientUtils {
Assert.assertNotNull(javaOpts);
Assert.assertTrue(javaOpts.contains("-D" + TezConstants.TEZ_ROOT_LOGGER_NAME + "=FOOBAR")
&& javaOpts.contains(TezConstants.TEZ_CONTAINER_LOG4J_PROPERTIES_FILE)
- && javaOpts.contains("-Dlog4j.configuratorClass=org.apache.tez.common.TezLog4jConfigurator"));
+ &&
+ javaOpts.contains("-Dlog4j.configuratorClass=org.apache.tez.common.TezLog4jConfigurator"));
}
@Test (timeout = 5000)
@@ -677,6 +679,16 @@ public class TestTezClientUtils {
Assert.assertTrue(resourceNames.contains("dir2-f.txt"));
}
- // TODO TEZ-2003 Add test to validate ServicePluginDescriptor propagation
+ @Test(timeout = 5000)
+ public void testServiceDescriptorSerializationForAM() {
+ Configuration conf = new Configuration(false);
+ ServicePluginsDescriptor servicePluginsDescriptor = ServicePluginsDescriptor.create(true);
+
+ ConfigurationProto confProto = TezClientUtils.createFinalConfProtoForApp(conf, null,
+ servicePluginsDescriptor);
+
+ assertTrue(confProto.hasAmPluginDescriptor());
+ assertTrue(confProto.getAmPluginDescriptor().getUberEnabled());
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/306020d2/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
index fccbb08..cd42109 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
@@ -37,11 +37,14 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.Vertex.VertexExecutionContext;
+import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration;
import org.apache.tez.dag.api.records.DAGProtos.PlanTaskLocationHint;
import org.apache.tez.dag.api.records.DAGProtos.PlanVertexType;
+import org.apache.tez.dag.api.records.DAGProtos.VertexExecutionContextProto;
import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
import org.junit.Assert;
import org.junit.Rule;
@@ -131,7 +134,8 @@ public class TestDAGPlan {
EdgeManagerPluginDescriptor emDesc = edgeProperty.getEdgeManagerDescriptor();
Assert.assertNotNull(emDesc);
Assert.assertEquals("emClass", emDesc.getClassName());
- Assert.assertTrue(Arrays.equals("emPayload".getBytes(), emDesc.getUserPayload().deepCopyAsArray()));
+ Assert.assertTrue(
+ Arrays.equals("emPayload".getBytes(), emDesc.getUserPayload().deepCopyAsArray()));
}
@Test(timeout = 5000)
@@ -311,4 +315,61 @@ public class TestDAGPlan {
assertNotNull(fetchedCredentials.getToken(new Text("Token1")));
assertNotNull(fetchedCredentials.getToken(new Text("Token2")));
}
+
+ @Test(timeout = 5000)
+ public void testServiceDescriptorPropagation() {
+ DAG dag = DAG.create("testDag");
+ ProcessorDescriptor pd1 = ProcessorDescriptor.create("processor1").
+ setUserPayload(UserPayload.create(ByteBuffer.wrap("processor1Bytes".getBytes())));
+ ProcessorDescriptor pd2 = ProcessorDescriptor.create("processor2").
+ setUserPayload(UserPayload.create(ByteBuffer.wrap("processor2Bytes".getBytes())));
+
+ VertexExecutionContext defaultExecutionContext =
+ VertexExecutionContext.create("plugin", "plugin", "plugin");
+ VertexExecutionContext v1Context = VertexExecutionContext.createExecuteInAm(true);
+
+
+ Vertex v1 = Vertex.create("v1", pd1, 10, Resource.newInstance(1024, 1)).setExecutionContext(v1Context);
+ Vertex v2 = Vertex.create("v2", pd2, 1, Resource.newInstance(1024, 1));
+ v1.setTaskLaunchCmdOpts("").setTaskEnvironment(new HashMap<String, String>())
+ .addTaskLocalFiles(new HashMap<String, LocalResource>());
+ v2.setTaskLaunchCmdOpts("").setTaskEnvironment(new HashMap<String, String>())
+ .addTaskLocalFiles(new HashMap<String, LocalResource>());
+
+ InputDescriptor inputDescriptor = InputDescriptor.create("input").
+ setUserPayload(UserPayload.create(ByteBuffer.wrap("inputBytes".getBytes())));
+ OutputDescriptor outputDescriptor = OutputDescriptor.create("output").
+ setUserPayload(UserPayload.create(ByteBuffer.wrap("outputBytes".getBytes())));
+ Edge edge = Edge.create(v1, v2, EdgeProperty.create(
+ DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL, outputDescriptor, inputDescriptor));
+
+ dag.addVertex(v1).addVertex(v2).addEdge(edge);
+ dag.setExecutionContext(defaultExecutionContext);
+
+ DAGPlan dagProto = dag.createDag(new TezConfiguration(), null, null, null, true);
+
+ assertEquals(2, dagProto.getVertexCount());
+ assertEquals(1, dagProto.getEdgeCount());
+
+ assertTrue(dagProto.hasDefaultExecutionContext());
+ VertexExecutionContextProto defaultContextProto = dagProto.getDefaultExecutionContext();
+ assertFalse(defaultContextProto.getExecuteInContainers());
+ assertFalse(defaultContextProto.getExecuteInAm());
+ assertEquals("plugin", defaultContextProto.getTaskSchedulerName());
+ assertEquals("plugin", defaultContextProto.getContainerLauncherName());
+ assertEquals("plugin", defaultContextProto.getTaskCommName());
+
+ VertexPlan v1Proto = dagProto.getVertex(0);
+ assertTrue(v1Proto.hasExecutionContext());
+ VertexExecutionContextProto v1ContextProto = v1Proto.getExecutionContext();
+ assertFalse(v1ContextProto.getExecuteInContainers());
+ assertTrue(v1ContextProto.getExecuteInAm());
+ assertFalse(v1ContextProto.hasTaskSchedulerName());
+ assertFalse(v1ContextProto.hasContainerLauncherName());
+ assertFalse(v1ContextProto.hasTaskCommName());
+
+ VertexPlan v2Proto = dagProto.getVertex(1);
+ assertFalse(v2Proto.hasExecutionContext());
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/306020d2/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
index 51b179a..e37f849 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
@@ -18,15 +18,32 @@
package org.apache.tez.dag.api;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.List;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.dag.api.Vertex.VertexExecutionContext;
+import org.apache.tez.dag.api.records.DAGProtos;
+import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
+import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto;
+import org.apache.tez.dag.api.records.DAGProtos.VertexExecutionContextProto;
+import org.apache.tez.serviceplugins.api.ContainerLauncher;
+import org.apache.tez.serviceplugins.api.ContainerLauncherDescriptor;
+import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
+import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor;
+import org.apache.tez.serviceplugins.api.TaskScheduler;
+import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor;
import org.junit.Assert;
import org.junit.Test;
@@ -43,7 +60,7 @@ public class TestDagTypeConverters {
DagTypeConverters.convertToDAGPlan(entityDescriptor);
Assert.assertEquals(payload.getVersion(), proto.getTezUserPayload().getVersion());
Assert.assertArrayEquals(payload.deepCopyAsArray(), proto.getTezUserPayload().getUserPayload().toByteArray());
- Assert.assertTrue(proto.hasHistoryText());
+ assertTrue(proto.hasHistoryText());
Assert.assertNotEquals(historytext, proto.getHistoryText());
Assert.assertEquals(historytext, new String(
TezCommonUtils.decompressByteStringToByteArray(proto.getHistoryText())));
@@ -89,4 +106,181 @@ public class TestDagTypeConverters {
Assert.assertEquals(2311, lr2UrlDeserialized.getPort());
}
+
+ @Test(timeout = 5000)
+ public void testVertexExecutionContextTranslation() {
+ VertexExecutionContext originalContext;
+ VertexExecutionContextProto contextProto;
+ VertexExecutionContext retrievedContext;
+
+
+ // Uber
+ originalContext = VertexExecutionContext.createExecuteInAm(true);
+ contextProto = DagTypeConverters.convertToProto(originalContext);
+ retrievedContext = DagTypeConverters.convertFromProto(contextProto);
+ assertEquals(originalContext, retrievedContext);
+
+ // Regular containers
+ originalContext = VertexExecutionContext.createExecuteInContainers(true);
+ contextProto = DagTypeConverters.convertToProto(originalContext);
+ retrievedContext = DagTypeConverters.convertFromProto(contextProto);
+ assertEquals(originalContext, retrievedContext);
+
+ // Custom
+ originalContext = VertexExecutionContext.create("plugin", "plugin", "plugin");
+ contextProto = DagTypeConverters.convertToProto(originalContext);
+ retrievedContext = DagTypeConverters.convertFromProto(contextProto);
+ assertEquals(originalContext, retrievedContext);
+ }
+
+
+ static final String testScheduler = "testScheduler";
+ static final String testLauncher = "testLauncher";
+ static final String testComm = "testComm";
+ static final String classSuffix = "_class";
+
+ @Test(timeout = 5000)
+ public void testServiceDescriptorTranslation() {
+
+
+ TaskSchedulerDescriptor[] taskSchedulers;
+ ContainerLauncherDescriptor[] containerLaunchers;
+ TaskCommunicatorDescriptor[] taskComms;
+
+ ServicePluginsDescriptor servicePluginsDescriptor;
+ AMPluginDescriptorProto proto;
+
+ // Uber-execution
+ servicePluginsDescriptor = ServicePluginsDescriptor.create(true);
+ proto = DagTypeConverters.convertServicePluginDescriptoToProto(servicePluginsDescriptor);
+ assertTrue(proto.hasUberEnabled());
+ assertTrue(proto.hasContainersEnabled());
+ assertTrue(proto.getUberEnabled());
+ assertTrue(proto.getContainersEnabled());
+ assertEquals(0, proto.getTaskSchedulersCount());
+ assertEquals(0, proto.getContainerLaunchersCount());
+ assertEquals(0, proto.getTaskCommunicatorsCount());
+
+ // Single plugin set specified. One with a payload.
+ taskSchedulers = createTaskScheduelrs(1, false);
+ containerLaunchers = createContainerLaunchers(1, false);
+ taskComms = createTaskCommunicators(1, true);
+
+ servicePluginsDescriptor = ServicePluginsDescriptor.create(taskSchedulers, containerLaunchers,
+ taskComms);
+ proto = DagTypeConverters.convertServicePluginDescriptoToProto(servicePluginsDescriptor);
+ assertTrue(proto.hasUberEnabled());
+ assertTrue(proto.hasContainersEnabled());
+ assertFalse(proto.getUberEnabled());
+ assertTrue(proto.getContainersEnabled());
+ verifyPlugins(proto.getTaskSchedulersList(), 1, testScheduler, false);
+ verifyPlugins(proto.getContainerLaunchersList(), 1, testLauncher, false);
+ verifyPlugins(proto.getTaskCommunicatorsList(), 1, testComm, true);
+
+
+ // Multiple plugin set specified. All with a payload
+ taskSchedulers = createTaskScheduelrs(3, true);
+ containerLaunchers = createContainerLaunchers(3, true);
+ taskComms = createTaskCommunicators(3, true);
+
+ servicePluginsDescriptor = ServicePluginsDescriptor.create(taskSchedulers, containerLaunchers,
+ taskComms);
+ proto = DagTypeConverters.convertServicePluginDescriptoToProto(servicePluginsDescriptor);
+ assertTrue(proto.hasUberEnabled());
+ assertTrue(proto.hasContainersEnabled());
+ assertFalse(proto.getUberEnabled());
+ assertTrue(proto.getContainersEnabled());
+ verifyPlugins(proto.getTaskSchedulersList(), 3, testScheduler, true);
+ verifyPlugins(proto.getContainerLaunchersList(), 3, testLauncher, true);
+ verifyPlugins(proto.getTaskCommunicatorsList(), 3, testComm, true);
+
+ // Single plugin set specified. One with a payload. No container execution. Uber enabled.
+ taskSchedulers = createTaskScheduelrs(1, false);
+ containerLaunchers = createContainerLaunchers(1, false);
+ taskComms = createTaskCommunicators(1, true);
+
+ servicePluginsDescriptor = ServicePluginsDescriptor.create(false, true, taskSchedulers, containerLaunchers,
+ taskComms);
+ proto = DagTypeConverters.convertServicePluginDescriptoToProto(servicePluginsDescriptor);
+ assertTrue(proto.hasUberEnabled());
+ assertTrue(proto.hasContainersEnabled());
+ assertTrue(proto.getUberEnabled());
+ assertFalse(proto.getContainersEnabled());
+ verifyPlugins(proto.getTaskSchedulersList(), 1, testScheduler, false);
+ verifyPlugins(proto.getContainerLaunchersList(), 1, testLauncher, false);
+ verifyPlugins(proto.getTaskCommunicatorsList(), 1, testComm, true);
+ }
+
+ private void verifyPlugins(List<TezNamedEntityDescriptorProto> entities, int expectedCount,
+ String baseString, boolean hasPayload) {
+ assertEquals(expectedCount, entities.size());
+ for (int i = 0; i < expectedCount; i++) {
+ assertEquals(indexedEntity(baseString, i), entities.get(i).getName());
+ TezEntityDescriptorProto subEntityProto = entities.get(i).getEntityDescriptor();
+ assertEquals(append(indexedEntity(baseString, i), classSuffix),
+ subEntityProto.getClassName());
+ assertEquals(hasPayload, subEntityProto.hasTezUserPayload());
+ if (hasPayload) {
+ UserPayload userPayload =
+ UserPayload
+ .create(subEntityProto.getTezUserPayload().getUserPayload().asReadOnlyByteBuffer(),
+ subEntityProto.getTezUserPayload().getVersion());
+ ByteBuffer bb = userPayload.getPayload();
+ assertNotNull(bb);
+ assertEquals(i, bb.getInt());
+ }
+ }
+ }
+
+ private TaskSchedulerDescriptor[] createTaskScheduelrs(int count, boolean withUserPayload) {
+ TaskSchedulerDescriptor[] descriptors = new TaskSchedulerDescriptor[count];
+ for (int i = 0; i < count; i++) {
+ descriptors[i] = TaskSchedulerDescriptor.create(indexedEntity(testScheduler, i),
+ append(indexedEntity(testScheduler, i), classSuffix));
+ if (withUserPayload) {
+ descriptors[i].setUserPayload(createPayload(i));
+ }
+ }
+ return descriptors;
+ }
+
+ private ContainerLauncherDescriptor[] createContainerLaunchers(int count,
+ boolean withUserPayload) {
+ ContainerLauncherDescriptor[] descriptors = new ContainerLauncherDescriptor[count];
+ for (int i = 0; i < count; i++) {
+ descriptors[i] = ContainerLauncherDescriptor.create(indexedEntity(testLauncher, i),
+ append(indexedEntity(testLauncher, i), classSuffix));
+ if (withUserPayload) {
+ descriptors[i].setUserPayload(createPayload(i));
+ }
+ }
+ return descriptors;
+ }
+
+ private TaskCommunicatorDescriptor[] createTaskCommunicators(int count, boolean withUserPayload) {
+ TaskCommunicatorDescriptor[] descriptors = new TaskCommunicatorDescriptor[count];
+ for (int i = 0; i < count; i++) {
+ descriptors[i] = TaskCommunicatorDescriptor.create(indexedEntity(testComm, i),
+ append(indexedEntity(testComm, i), classSuffix));
+ if (withUserPayload) {
+ descriptors[i].setUserPayload(createPayload(i));
+ }
+ }
+ return descriptors;
+ }
+
+ private static UserPayload createPayload(int i) {
+ ByteBuffer bb = ByteBuffer.allocate(4);
+ bb.putInt(0, i);
+ UserPayload payload = UserPayload.create(bb);
+ return payload;
+ }
+
+ private String indexedEntity(String name, int index) {
+ return name + index;
+ }
+
+ private String append(String s1, String s2) {
+ return s1 + s2;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/306020d2/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 4128841..9b16a90 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -409,9 +409,9 @@ public class DAGAppMaster extends AbstractService {
}
taskSchedulerDescriptors = parsePlugin(taskSchedulers,
- (amPluginDescriptorProto == null || amPluginDescriptorProto.getTaskScedulersCount() == 0 ?
+ (amPluginDescriptorProto == null || amPluginDescriptorProto.getTaskSchedulersCount() == 0 ?
null :
- amPluginDescriptorProto.getTaskScedulersList()),
+ amPluginDescriptorProto.getTaskSchedulersList()),
tezYarnEnabled, uberEnabled);
containerLauncherDescriptors = parsePlugin(containerLaunchers,