You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/02/10 22:52:22 UTC
[10/17] tez git commit: TEZ-1893. Verify invalid -1 parallelism in
DAG.verify() (zjffdu)
TEZ-1893. Verify invalid -1 parallelism in DAG.verify() (zjffdu)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3da9566d
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3da9566d
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3da9566d
Branch: refs/heads/TEZ-2003
Commit: 3da9566de60f6be2d03465b4f4cd2724e7da8c4f
Parents: 20bf31c
Author: Jeff Zhang <zj...@apache.org>
Authored: Mon Feb 9 09:46:50 2015 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Mon Feb 9 09:46:50 2015 +0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../main/java/org/apache/tez/dag/api/DAG.java | 42 +++++++++
.../apache/tez/client/TestTezClientUtils.java | 4 +-
.../org/apache/tez/dag/api/TestDAGVerify.java | 98 +++++++++++++++++++-
4 files changed, 142 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/3da9566d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index df94b54..a1b42a8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -181,6 +181,7 @@ TEZ-UI CHANGES (TEZ-8):
Release 0.5.4: Unreleased
ALL CHANGES:
+ TEZ-1893. Verify invalid -1 parallelism in DAG.verify().
TEZ-900. Confusing message for incorrect queue for some tez examples.
TEZ-2036. OneToOneEdgeManager should enforce that source and destination
tasks have same number
http://git-wip-us.apache.org/repos/asf/tez/blob/3da9566d/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index c6a4f4a..7be5ba4 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -378,6 +378,48 @@ public class DAG {
}
}
}
+
+ // check the vertices with -1 parallelism, currently only 3 cases are allowed to has -1 parallelism.
+ // It is OK not using topological order to check vertices here.
+ // 1. has input initializers
+ // 2. 1-1 uninited sources
+ // 3. has custom vertex manager
+ for (Vertex vertex : vertices.values()) {
+ if (vertex.getParallelism() == -1) {
+ boolean hasInputInititlaizer = false;
+ if (vertex.getDataSources()!= null && !vertex.getDataSources().isEmpty()) {
+ for (DataSourceDescriptor ds : vertex.getDataSources()) {
+ if (ds.getInputInitializerDescriptor() != null) {
+ hasInputInititlaizer = true;
+ break;
+ }
+ }
+ }
+ if (hasInputInititlaizer) {
+ continue;
+ }
+
+ boolean has1to1UninitedSources = false;
+ if (vertex.getInputVertices()!= null && !vertex.getInputVertices().isEmpty()) {
+ for (Vertex srcVertex : vertex.getInputVertices()) {
+ if (srcVertex.getParallelism() == -1) {
+ has1to1UninitedSources = true;
+ break;
+ }
+ }
+ }
+ if (has1to1UninitedSources) {
+ continue;
+ }
+
+ if (vertex.getVertexManagerPlugin() != null) {
+ continue;
+ }
+ throw new IllegalStateException(vertex.getName() +
+ " has -1 tasks but does not have input initializers, " +
+ "1-1 uninited sources or custom vertex manager to set it at runtime");
+ }
+ }
}
// AnnotatedVertex is used by verify()
http://git-wip-us.apache.org/repos/asf/tez/blob/3da9566d/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java
index 8cc55ca..468361b 100644
--- a/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java
+++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java
@@ -203,7 +203,7 @@ public class TestTezClientUtils {
ApplicationId appId = ApplicationId.newInstance(1000, 1);
DAG dag = DAG.create("testdag");
- dag.addVertex(Vertex.create("testVertex", ProcessorDescriptor.create("processorClassname"))
+ dag.addVertex(Vertex.create("testVertex", ProcessorDescriptor.create("processorClassname"), 1)
.setTaskLaunchCmdOpts("initialLaunchOpts"));
AMConfiguration amConf =
new AMConfiguration(tezConf, new HashMap<String, LocalResource>(), new Credentials());
@@ -239,7 +239,7 @@ public class TestTezClientUtils {
ApplicationId appId = ApplicationId.newInstance(1000, 1);
DAG dag = DAG.create("testdag");
- dag.addVertex(Vertex.create("testVertex", ProcessorDescriptor.create("processorClassname"))
+ dag.addVertex(Vertex.create("testVertex", ProcessorDescriptor.create("processorClassname"), 1)
.setTaskLaunchCmdOpts("initialLaunchOpts"));
AMConfiguration amConf =
new AMConfiguration(tezConf, new HashMap<String, LocalResource>(), new Credentials());
http://git-wip-us.apache.org/repos/asf/tez/blob/3da9566d/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
index 4bf594f..74e780c 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
@@ -50,7 +50,9 @@ public class TestDAGVerify {
private final String dummyProcessorClassName = TestDAGVerify.class.getName();
private final String dummyInputClassName = TestDAGVerify.class.getName();
+ private final String dummyInputInitClassName = TestDAGVerify.class.getName();
private final String dummyOutputClassName = TestDAGVerify.class.getName();
+ private final String dummyVMPluginClassName = TestDAGVerify.class.getName();
private final int dummyTaskCount = 2;
private final Resource dummyTaskResource = Resource.newInstance(1, 1);
@@ -190,6 +192,9 @@ public class TestDAGVerify {
Vertex v1 = Vertex.create("v1",
ProcessorDescriptor.create(dummyProcessorClassName),
-1, dummyTaskResource);
+ DataSourceDescriptor dsDesc = DataSourceDescriptor.create(InputDescriptor.create(dummyInputClassName),
+ InputInitializerDescriptor.create(dummyInputInitClassName), null);
+ v1.addDataSource("input_1", dsDesc);
Vertex v2 = Vertex.create("v2",
ProcessorDescriptor.create("MapProcessor"),
-1, dummyTaskResource);
@@ -957,7 +962,7 @@ public class TestDAGVerify {
taskLocationHints.add(TaskLocationHint.createTaskLocationHint(hosts, null));
VertexLocationHint vLoc = VertexLocationHint.create(taskLocationHints);
DataSourceDescriptor ds = DataSourceDescriptor.create(InputDescriptor.create("I.class"),
- null, dummyTaskCount, null, vLoc, lrs2);
+ InputInitializerDescriptor.create(dummyInputInitClassName), dummyTaskCount, null, vLoc, lrs2);
v1.addDataSource("i1", ds);
DAG dag = DAG.create("testDag");
@@ -1056,4 +1061,95 @@ public class TestDAGVerify {
Assert.assertTrue(foundModifyAcls);
}
+ // v1 has input initializer
+ @Test(timeout = 5000)
+ public void testDAGInvalidParallelism1() {
+ DAG dag = DAG.create("testDAG");
+ Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create(dummyProcessorClassName));
+ dag.addVertex(v1);
+ try {
+ dag.verify();
+ Assert.fail();
+ } catch (Exception e) {
+ Assert.assertEquals(
+ "v1 has -1 tasks but does not have input initializers, 1-1 uninited sources or custom vertex manager to set it at runtime",
+ e.getMessage());
+ }
+
+ DataSourceDescriptor dsDesc = DataSourceDescriptor.create(InputDescriptor.create(dummyInputClassName),
+ InputInitializerDescriptor.create(dummyInputInitClassName), null);
+ v1.addDataSource("input_1", dsDesc);
+ dag.verify();
+ }
+
+ // v1 has custom vertex manager
+ @Test(timeout = 5000)
+ public void testDAGInvalidParallelism2() {
+ DAG dag = DAG.create("testDAG");
+ Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create(dummyProcessorClassName));
+ dag.addVertex(v1);
+ try {
+ dag.verify();
+ Assert.fail();
+ } catch (Exception e) {
+ Assert.assertEquals(
+ "v1 has -1 tasks but does not have input initializers, 1-1 uninited sources or custom vertex manager to set it at runtime",
+ e.getMessage());
+ }
+
+ v1.setVertexManagerPlugin(VertexManagerPluginDescriptor.create(dummyVMPluginClassName));
+ dag.verify();
+ }
+
+ // v1 has 1-1 united source vertex v0 which has input initializer
+ @Test(timeout = 5000)
+ public void testDAGInvalidParallelism3() {
+ DAG dag = DAG.create("testDAG");
+ Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create(dummyProcessorClassName));
+ dag.addVertex(v1);
+ try {
+ dag.verify();
+ Assert.fail();
+ } catch (Exception e) {
+ Assert.assertEquals(
+ "v1 has -1 tasks but does not have input initializers, 1-1 uninited sources or custom vertex manager to set it at runtime",
+ e.getMessage());
+ }
+
+ Vertex v0 = Vertex.create("v0", ProcessorDescriptor.create(dummyProcessorClassName));
+ DataSourceDescriptor dsDesc = DataSourceDescriptor.create(InputDescriptor.create(dummyInputClassName),
+ InputInitializerDescriptor.create(dummyInputInitClassName), null);
+ v0.addDataSource("input", dsDesc);
+ dag.addVertex(v0);
+ dag.addEdge(Edge.create(v0, v1, EdgeProperty.create(DataMovementType.ONE_TO_ONE,
+ DataSourceType.PERSISTED,SchedulingType.SEQUENTIAL,
+ OutputDescriptor.create(dummyOutputClassName),
+ InputDescriptor.create(dummyInputClassName))));
+ dag.verify();
+ }
+
+ // v1 has an 1-1 united parent v0 which has custom vertex manager
+ @Test//(timeout = 5000)
+ public void testDAGInvalidParallelism4() {
+ DAG dag = DAG.create("testDAG");
+ Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create(dummyProcessorClassName));
+ dag.addVertex(v1);
+ try {
+ dag.verify();
+ Assert.fail();
+ } catch (Exception e) {
+ Assert.assertEquals(
+ "v1 has -1 tasks but does not have input initializers, 1-1 uninited sources or custom vertex manager to set it at runtime",
+ e.getMessage());
+ }
+
+ Vertex v0 = Vertex.create("v2", ProcessorDescriptor.create(dummyProcessorClassName));
+ v0.setVertexManagerPlugin(VertexManagerPluginDescriptor.create(dummyVMPluginClassName));
+ dag.addVertex(v0);
+ dag.addEdge(Edge.create(v0, v1, EdgeProperty.create(DataMovementType.ONE_TO_ONE,
+ DataSourceType.PERSISTED,SchedulingType.SEQUENTIAL,
+ OutputDescriptor.create(dummyOutputClassName),
+ InputDescriptor.create(dummyInputClassName))));
+ dag.verify();
+ }
}