You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ml...@apache.org on 2013/05/15 02:13:29 UTC

git commit: TEZ-31: implement DAG.verify() to validate structural properties of dag created through tez-dag-api

Updated Branches:
  refs/heads/TEZ-1 d660948d7 -> 645c470ee


TEZ-31: implement DAG.verify() to validate structural properties of dag created through tez-dag-api


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/645c470e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/645c470e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/645c470e

Branch: refs/heads/TEZ-1
Commit: 645c470eeb1c4476437498fed639c3c20f57c672
Parents: d660948
Author: mikelid <mi...@microsoft.com>
Authored: Tue May 14 17:13:14 2013 -0700
Committer: mikelid <mi...@microsoft.com>
Committed: Tue May 14 17:13:14 2013 -0700

----------------------------------------------------------------------
 .../src/main/java/org/apache/tez/dag/api/DAG.java  |  149 +++++++++++-
 .../java/org/apache/tez/dag/api/TestDAGVerify.java |  194 +++++++++++++++
 2 files changed, 340 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/645c470e/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
index 171921d..300ef31 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Stack;
 
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
@@ -87,13 +88,155 @@ public class DAG { // FIXME rename to Topology
     this.name = name;
   }
   
-  public void verify() throws TezException { // FIXME better exception
-    //FIXME are task resources compulsory or will the DAG AM put in a default
-    //for each vertex if not specified?
+  // AnnotatedVertex is used by verify() 
+  private class AnnotatedVertex {
+    Vertex v;
+  
+    int index; //for Tarjan's algorithm    
+    int lowlink; //for Tarjan's algorithm
+    boolean onstack; //for Tarjan's algorithm 
+
+    int inDegree;
+    int outDegree;
+    
+    private AnnotatedVertex(Vertex v){
+       this.v = v;
+       index = -1;
+       lowlink = -1;
+       inDegree = 0;
+       outDegree = 0;
+    }
   }
+  
+  // verify()
+  // 
+  // Default rules
+  //   Illegal:
+  //     - duplicate vertex id
+  //     - cycles
+  //
+  //   Ok:
+  //     - orphaned vertex.  Occurs in map-only
+  //     - islands.  Occurs if job has unrelated workflows.
+  //
+  //   Not yet categorized:
+  //     - orphaned vertex in DAG of >1 vertex.  Could be unrelated map-only job.
+  //     - v1->v2 via two edges.  perhaps some self-join job would use this?
+  //
+  // "restricted" mode: 
+  //   In short term, the supported DAGs are limited. Call with restricted=true for these verifications.  
+  //   Illegal: 
+  //     - any vertex with more than one input or output edge. (n-ary input, n-ary merge) 
+  public void verify() throws IllegalStateException {
+    verify(true);
+  }
+  
+  public void verify(boolean restricted) throws IllegalStateException  { 
+    Map<Vertex, List<Edge>> edgeMap = new HashMap<Vertex, List<Edge>>();
+    for(Edge e : edges){
+      Vertex inputVertex = e.getInputVertex();
+      List<Edge> edgeList = edgeMap.get(inputVertex);
+      if(edgeList == null){
+        edgeList = new ArrayList<Edge>();
+        edgeMap.put(inputVertex, edgeList);
+      }
+      edgeList.add(e);
+    }
     
+    // check for duplicate vertex names, and prepare for cycle detection
+    Map<String, AnnotatedVertex> vertexMap = new HashMap<String, AnnotatedVertex>();
+    for(Vertex v : vertices){
+      if(vertexMap.containsKey(v.getVertexName())){
+         throw new IllegalStateException("DAG contains multiple vertices with name: " + v.getVertexName());
+      }
+      vertexMap.put(v.getVertexName(), new AnnotatedVertex(v));
+    }
+    
+    detectCycles(edgeMap, vertexMap);
+    
+    if(restricted){
+      for(Edge e : edges){
+        vertexMap.get(e.getInputVertex().getVertexName()).outDegree++;
+        vertexMap.get(e.getOutputVertex().getVertexName()).inDegree++;
+      }
+      for(AnnotatedVertex av: vertexMap.values()){
+        if(av.inDegree > 1){
+          throw new IllegalStateException("Vertex has inDegree>1: " + av.v.getVertexName());
+        }
+        if(av.outDegree > 1){
+          throw new IllegalStateException("Vertex has outDegree>1: " + av.v.getVertexName());
+        }
+      }
+    }
+  }
+  
+  // Adaptation of Tarjan's algorithm for connected components.
+  // http://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm
+  private void detectCycles(Map<Vertex, List<Edge>> edgeMap, Map<String, AnnotatedVertex> vertexMap) 
+      throws IllegalStateException{
+    Integer nextIndex = 0; // boxed integer so it is passed by reference.
+    Stack<AnnotatedVertex> stack = new Stack<DAG.AnnotatedVertex>();
+    for(AnnotatedVertex av: vertexMap.values()){
+      if(av.index == -1){
+        assert stack.empty();
+        strongConnect(av, vertexMap, edgeMap, stack, nextIndex);
+      }
+    }
+  }
+
+  // part of Tarjan's algorithm for connected components.
+  // http://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm
+  private void strongConnect(
+          AnnotatedVertex av, 
+          Map<String, AnnotatedVertex> vertexMap, 
+          Map<Vertex, List<Edge>> edgeMap, 
+          Stack<AnnotatedVertex> stack, Integer nextIndex) throws IllegalStateException{
+    av.index = nextIndex;
+    av.lowlink = nextIndex;
+    nextIndex++;
+    stack.push(av);
+    av.onstack = true;
+    
+    List<Edge> edges = edgeMap.get(av.v);
+    if(edges != null){
+      for(Edge e : edgeMap.get(av.v)){
+        AnnotatedVertex outVertex = vertexMap.get(e.getOutputVertex().getVertexName());
+        if(outVertex.index == -1){
+          strongConnect(outVertex, vertexMap, edgeMap, stack, nextIndex);
+          av.lowlink = Math.min(av.lowlink, outVertex.lowlink);
+        }
+        else if(outVertex.onstack){
+          // strongly connected component detected, but we will wait till later so that the full cycle can be displayed.
+          // update lowlink in case outputVertex should be considered the root of this component.
+          av.lowlink = Math.min(av.lowlink, outVertex.index);
+        }
+      }
+    }
+
+    if(av.lowlink == av.index ){
+       AnnotatedVertex pop = stack.pop();
+       pop.onstack = false;
+       if(pop != av){
+         // there was something on the stack other than this "av".
+         // this indicates there is a scc/cycle. It comprises all nodes from top of stack to "av"
+         StringBuilder message = new StringBuilder();
+         message.append(av.v.getVertexName() + " <- ");
+         for( ; pop != av; pop = stack.pop()){ 
+           message.append(pop.v.getVertexName() + " <- ");
+           pop.onstack = false;
+         }
+         message.append(av.v.getVertexName());
+         throw new IllegalStateException("DAG contains a cycle: " + message);
+       }
+    }
+  }
+ 
+  
   // create protobuf message describing DAG
   public DAGPlan createDag(){
+    
+    verify(true);
+    
     DAGPlan.Builder jobBuilder = DAGPlan.newBuilder();  
 
     jobBuilder.setName(this.name);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/645c470e/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java b/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
new file mode 100644
index 0000000..4022604
--- /dev/null
+++ b/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+import org.apache.tez.dag.api.EdgeProperty.ConnectionPattern;
+import org.apache.tez.dag.api.EdgeProperty.SourceType;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestDAGVerify {
+  
+  private final String dummyProcessorClassName = TestDAGVerify.class.getName();
+  private final String dummyInputClassName = TestDAGVerify.class.getName();
+  private final String dummyOutputClassName = TestDAGVerify.class.getName();
+  private final int dummyTaskCount = 2;
+  
+  //    v1
+  //    |  
+  //    v2
+  @Test
+  public void testVerify1() {
+    Vertex v1 = new Vertex("v1",dummyProcessorClassName, dummyTaskCount);
+    Vertex v2 = new Vertex("v2","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount);
+    Edge e1 = new Edge(v1, v2, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, dummyInputClassName, dummyOutputClassName));
+    DAG dag = new DAG();
+    dag.addVertex(v1);
+    dag.addVertex(v2);
+    dag.addEdge(e1);
+    dag.verify();
+  }
+  
+  //    v1 <----
+  //      |     ^
+  //       v2   ^
+  //      |  |  ^
+  //    v3    v4
+  @Test
+  public void testCycle1() {
+    IllegalStateException ex=null;
+    Vertex v1 = new Vertex("v1","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount);
+    Vertex v2 = new Vertex("v2","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount);
+    Vertex v3 = new Vertex("v3","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount);
+    Vertex v4 = new Vertex("v4","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount);
+    Edge e1 = new Edge(v1, v2, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, "dummy input class", "dummy output class"));
+    Edge e2 = new Edge(v2, v3, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, "dummy input class", "dummy output class"));
+    Edge e3 = new Edge(v2, v4, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, "dummy input class", "dummy output class"));
+    Edge e4 = new Edge(v4, v1, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, "dummy input class", "dummy output class"));
+    DAG dag = new DAG();
+    dag.addVertex(v1);
+    dag.addVertex(v2);
+    dag.addVertex(v3);
+    dag.addVertex(v4);
+    dag.addEdge(e1);
+    dag.addEdge(e2);
+    dag.addEdge(e3);
+    dag.addEdge(e4);
+    try{
+      dag.verify();
+    }
+    catch (IllegalStateException e){
+      ex = e;
+    }
+    Assert.assertNotNull(ex);
+    System.out.println(ex.getMessage());
+    Assert.assertTrue(ex.getMessage().startsWith("DAG contains a cycle"));
+  }
+  
+//     v1 
+//      |     
+//    -> v2   
+//    ^  | | 
+//    v3    v4
+  @Test
+  public void testCycle2() {
+    IllegalStateException ex=null;
+    Vertex v1 = new Vertex("v1","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount);
+    Vertex v2 = new Vertex("v2","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount);
+    Vertex v3 = new Vertex("v3","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount);
+    Vertex v4 = new Vertex("v4","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount);
+    Edge e1 = new Edge(v1, v2, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, "dummy input class", "dummy output class"));
+    Edge e2 = new Edge(v2, v3, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, "dummy input class", "dummy output class"));
+    Edge e3 = new Edge(v2, v4, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, "dummy input class", "dummy output class"));
+    Edge e4 = new Edge(v3, v2, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, "dummy input class", "dummy output class"));
+    DAG dag = new DAG();
+    dag.addVertex(v1);
+    dag.addVertex(v2);
+    dag.addVertex(v3);
+    dag.addVertex(v4);
+    dag.addEdge(e1);
+    dag.addEdge(e2);
+    dag.addEdge(e3);
+    dag.addEdge(e4);
+    try{
+      dag.verify();
+    }
+    catch (IllegalStateException e){
+      ex = e;
+    }
+    Assert.assertNotNull(ex);
+    System.out.println(ex.getMessage());
+    Assert.assertTrue(ex.getMessage().startsWith("DAG contains a cycle"));
+  }
+  
+  @Test
+  public void repeatedVertexName() {
+    IllegalStateException ex=null;
+    Vertex v1 = new Vertex("v1","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount);
+    Vertex v1repeat = new Vertex("v1","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount);
+    DAG dag = new DAG();
+    dag.addVertex(v1);
+    dag.addVertex(v1repeat);
+    try {
+      dag.verify();
+    }
+    catch (IllegalStateException e){
+      ex = e;
+    }
+    Assert.assertNotNull(ex);
+    System.out.println(ex.getMessage());
+    Assert.assertTrue(ex.getMessage().startsWith("DAG contains multiple vertices with name"));
+  }
+  
+  //  v1  v2
+  //   |  |     
+  //    v3   
+  @Test
+  public void BinaryInput() {
+    IllegalStateException ex=null;
+    try {
+      Vertex v1 = new Vertex("v1","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount);
+      Vertex v2 = new Vertex("v2","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount);
+      Vertex v3 = new Vertex("v3","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount);
+      Edge e1 = new Edge(v1, v3, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, "dummy input class", "dummy output class"));
+      Edge e2 = new Edge(v2, v3, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, "dummy input class", "dummy output class"));
+      DAG dag = new DAG();
+      dag.addVertex(v1);
+      dag.addVertex(v2);
+      dag.addVertex(v3);
+      dag.addEdge(e1);
+      dag.addEdge(e2);
+      dag.verify();
+    }
+    catch (IllegalStateException e){
+      ex = e;
+    }
+    Assert.assertNotNull(ex);
+    System.out.println(ex.getMessage());
+    Assert.assertTrue(ex.getMessage().startsWith("Vertex has inDegree>1"));
+  }
+  
+  //   v1  
+  //  |  |     
+  //  v2  v3 
+  @Test
+  public void BinaryOutput() {
+    IllegalStateException ex=null;
+    try {
+      Vertex v1 = new Vertex("v1","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount);
+      Vertex v2 = new Vertex("v2","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount);
+      Vertex v3 = new Vertex("v3","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount);
+      Edge e1 = new Edge(v1, v2, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, "dummy input class", "dummy output class"));
+      Edge e2 = new Edge(v1, v2, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, "dummy input class", "dummy output class"));
+      DAG dag = new DAG();
+      dag.addVertex(v1);
+      dag.addVertex(v2);
+      dag.addVertex(v3);
+      dag.addEdge(e1);
+      dag.addEdge(e2);
+      dag.verify();
+    }
+    catch (IllegalStateException e){
+      ex = e;
+    }
+    Assert.assertNotNull(ex);
+    System.out.println(ex.getMessage());
+    Assert.assertTrue(ex.getMessage().startsWith("Vertex has outDegree>1"));
+  }
+}