You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/02/10 22:52:22 UTC

[10/17] tez git commit: TEZ-1893. Verify invalid -1 parallelism in DAG.verify() (zjffdu)

TEZ-1893. Verify invalid -1 parallelism in DAG.verify() (zjffdu)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3da9566d
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3da9566d
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3da9566d

Branch: refs/heads/TEZ-2003
Commit: 3da9566de60f6be2d03465b4f4cd2724e7da8c4f
Parents: 20bf31c
Author: Jeff Zhang <zj...@apache.org>
Authored: Mon Feb 9 09:46:50 2015 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Mon Feb 9 09:46:50 2015 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../main/java/org/apache/tez/dag/api/DAG.java   | 42 +++++++++
 .../apache/tez/client/TestTezClientUtils.java   |  4 +-
 .../org/apache/tez/dag/api/TestDAGVerify.java   | 98 +++++++++++++++++++-
 4 files changed, 142 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/3da9566d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index df94b54..a1b42a8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -181,6 +181,7 @@ TEZ-UI CHANGES (TEZ-8):
 Release 0.5.4: Unreleased
 
 ALL CHANGES:
+  TEZ-1893. Verify invalid -1 parallelism in DAG.verify().
   TEZ-900. Confusing message for incorrect queue for some tez examples.
   TEZ-2036. OneToOneEdgeManager should enforce that source and destination
   tasks have same number

http://git-wip-us.apache.org/repos/asf/tez/blob/3da9566d/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index c6a4f4a..7be5ba4 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -378,6 +378,48 @@ public class DAG {
         }
       }
     }
+
+    // check the vertices with -1 parallelism, currently only 3 cases are allowed to has -1 parallelism.
+    // It is OK not using topological order to check vertices here.
+    // 1. has input initializers
+    // 2. 1-1 uninited sources
+    // 3. has custom vertex manager
+    for (Vertex vertex : vertices.values()) {
+      if (vertex.getParallelism() == -1) {
+        boolean hasInputInititlaizer = false;
+        if (vertex.getDataSources()!= null && !vertex.getDataSources().isEmpty()) {
+          for (DataSourceDescriptor ds : vertex.getDataSources()) {
+            if (ds.getInputInitializerDescriptor() != null) {
+              hasInputInititlaizer = true;
+              break;
+            }
+          }
+        }
+        if (hasInputInititlaizer) {
+          continue;
+        }
+
+        boolean has1to1UninitedSources = false;
+        if (vertex.getInputVertices()!= null && !vertex.getInputVertices().isEmpty()) {
+          for (Vertex srcVertex : vertex.getInputVertices()) {
+            if (srcVertex.getParallelism() == -1) {
+              has1to1UninitedSources = true;
+              break;
+            }
+          }
+        }
+        if (has1to1UninitedSources) {
+          continue;
+        }
+
+        if (vertex.getVertexManagerPlugin() != null) {
+          continue;
+        }
+        throw new IllegalStateException(vertex.getName() +
+            " has -1 tasks but does not have input initializers, " +
+            "1-1 uninited sources or custom vertex manager to set it at runtime");
+      }
+    }
   }
   
   // AnnotatedVertex is used by verify()

http://git-wip-us.apache.org/repos/asf/tez/blob/3da9566d/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java
index 8cc55ca..468361b 100644
--- a/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java
+++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java
@@ -203,7 +203,7 @@ public class TestTezClientUtils {
 
     ApplicationId appId = ApplicationId.newInstance(1000, 1);
     DAG dag = DAG.create("testdag");
-    dag.addVertex(Vertex.create("testVertex", ProcessorDescriptor.create("processorClassname"))
+    dag.addVertex(Vertex.create("testVertex", ProcessorDescriptor.create("processorClassname"), 1)
         .setTaskLaunchCmdOpts("initialLaunchOpts"));
     AMConfiguration amConf =
         new AMConfiguration(tezConf, new HashMap<String, LocalResource>(), new Credentials());
@@ -239,7 +239,7 @@ public class TestTezClientUtils {
 
     ApplicationId appId = ApplicationId.newInstance(1000, 1);
     DAG dag = DAG.create("testdag");
-    dag.addVertex(Vertex.create("testVertex", ProcessorDescriptor.create("processorClassname"))
+    dag.addVertex(Vertex.create("testVertex", ProcessorDescriptor.create("processorClassname"), 1)
         .setTaskLaunchCmdOpts("initialLaunchOpts"));
     AMConfiguration amConf =
         new AMConfiguration(tezConf, new HashMap<String, LocalResource>(), new Credentials());

http://git-wip-us.apache.org/repos/asf/tez/blob/3da9566d/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
index 4bf594f..74e780c 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
@@ -50,7 +50,9 @@ public class TestDAGVerify {
 
   private final String dummyProcessorClassName = TestDAGVerify.class.getName();
   private final String dummyInputClassName = TestDAGVerify.class.getName();
+  private final String dummyInputInitClassName = TestDAGVerify.class.getName();
   private final String dummyOutputClassName = TestDAGVerify.class.getName();
+  private final String dummyVMPluginClassName = TestDAGVerify.class.getName();
   private final int dummyTaskCount = 2;
   private final Resource dummyTaskResource = Resource.newInstance(1, 1);
 
@@ -190,6 +192,9 @@ public class TestDAGVerify {
     Vertex v1 = Vertex.create("v1",
         ProcessorDescriptor.create(dummyProcessorClassName),
         -1, dummyTaskResource);
+    DataSourceDescriptor dsDesc = DataSourceDescriptor.create(InputDescriptor.create(dummyInputClassName),
+        InputInitializerDescriptor.create(dummyInputInitClassName), null);
+    v1.addDataSource("input_1", dsDesc);
     Vertex v2 = Vertex.create("v2",
         ProcessorDescriptor.create("MapProcessor"),
         -1, dummyTaskResource);
@@ -957,7 +962,7 @@ public class TestDAGVerify {
     taskLocationHints.add(TaskLocationHint.createTaskLocationHint(hosts, null));
     VertexLocationHint vLoc = VertexLocationHint.create(taskLocationHints);
     DataSourceDescriptor ds = DataSourceDescriptor.create(InputDescriptor.create("I.class"), 
-        null, dummyTaskCount, null, vLoc, lrs2);
+        InputInitializerDescriptor.create(dummyInputInitClassName), dummyTaskCount, null, vLoc, lrs2);
     v1.addDataSource("i1", ds);
         
     DAG dag = DAG.create("testDag");
@@ -1056,4 +1061,95 @@ public class TestDAGVerify {
     Assert.assertTrue(foundModifyAcls);
   }
 
+  // v1 has input initializer
+  @Test(timeout = 5000)
+  public void testDAGInvalidParallelism1() {
+    DAG dag = DAG.create("testDAG");
+    Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create(dummyProcessorClassName));
+    dag.addVertex(v1);
+    try {
+      dag.verify();
+      Assert.fail();
+    } catch (Exception e) {
+      Assert.assertEquals(
+          "v1 has -1 tasks but does not have input initializers, 1-1 uninited sources or custom vertex manager to set it at runtime",
+          e.getMessage());
+    }
+
+    DataSourceDescriptor dsDesc = DataSourceDescriptor.create(InputDescriptor.create(dummyInputClassName),
+        InputInitializerDescriptor.create(dummyInputInitClassName), null);
+    v1.addDataSource("input_1", dsDesc);
+    dag.verify();
+  }
+
+  // v1 has custom vertex manager
+  @Test(timeout = 5000)
+  public void testDAGInvalidParallelism2() {
+    DAG dag = DAG.create("testDAG");
+    Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create(dummyProcessorClassName));
+    dag.addVertex(v1);
+    try {
+      dag.verify();
+      Assert.fail();
+    } catch (Exception e) {
+      Assert.assertEquals(
+          "v1 has -1 tasks but does not have input initializers, 1-1 uninited sources or custom vertex manager to set it at runtime",
+          e.getMessage());
+    }
+
+    v1.setVertexManagerPlugin(VertexManagerPluginDescriptor.create(dummyVMPluginClassName));
+    dag.verify();
+  }
+
+  // v1 has 1-1 united source vertex v0 which has input initializer
+  @Test(timeout = 5000)
+  public void testDAGInvalidParallelism3() {
+    DAG dag = DAG.create("testDAG");
+    Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create(dummyProcessorClassName));
+    dag.addVertex(v1);
+    try {
+      dag.verify();
+      Assert.fail();
+    } catch (Exception e) {
+      Assert.assertEquals(
+          "v1 has -1 tasks but does not have input initializers, 1-1 uninited sources or custom vertex manager to set it at runtime",
+          e.getMessage());
+    }
+
+    Vertex v0 = Vertex.create("v0", ProcessorDescriptor.create(dummyProcessorClassName));
+    DataSourceDescriptor dsDesc = DataSourceDescriptor.create(InputDescriptor.create(dummyInputClassName),
+        InputInitializerDescriptor.create(dummyInputInitClassName), null);
+    v0.addDataSource("input", dsDesc);
+    dag.addVertex(v0);
+    dag.addEdge(Edge.create(v0, v1, EdgeProperty.create(DataMovementType.ONE_TO_ONE,
+        DataSourceType.PERSISTED,SchedulingType.SEQUENTIAL,
+        OutputDescriptor.create(dummyOutputClassName),
+        InputDescriptor.create(dummyInputClassName))));
+    dag.verify();
+  }
+
+  // v1 has an 1-1 united parent v0 which has custom vertex manager
+  @Test//(timeout = 5000)
+  public void testDAGInvalidParallelism4() {
+    DAG dag = DAG.create("testDAG");
+    Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create(dummyProcessorClassName));
+    dag.addVertex(v1);
+    try {
+      dag.verify();
+      Assert.fail();
+    } catch (Exception e) {
+      Assert.assertEquals(
+          "v1 has -1 tasks but does not have input initializers, 1-1 uninited sources or custom vertex manager to set it at runtime",
+          e.getMessage());
+    }
+
+    Vertex v0 = Vertex.create("v2", ProcessorDescriptor.create(dummyProcessorClassName));
+    v0.setVertexManagerPlugin(VertexManagerPluginDescriptor.create(dummyVMPluginClassName));
+    dag.addVertex(v0);
+    dag.addEdge(Edge.create(v0, v1, EdgeProperty.create(DataMovementType.ONE_TO_ONE,
+        DataSourceType.PERSISTED,SchedulingType.SEQUENTIAL,
+        OutputDescriptor.create(dummyOutputClassName),
+        InputDescriptor.create(dummyInputClassName))));
+    dag.verify();
+  }
 }