You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ml...@apache.org on 2013/05/15 02:13:29 UTC
git commit: TEZ-31: implement DAG.verify() to validate structural
properties of dag created through tez-dag-api
Updated Branches:
refs/heads/TEZ-1 d660948d7 -> 645c470ee
TEZ-31: implement DAG.verify() to validate structural properties of dag created through tez-dag-api
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/645c470e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/645c470e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/645c470e
Branch: refs/heads/TEZ-1
Commit: 645c470eeb1c4476437498fed639c3c20f57c672
Parents: d660948
Author: mikelid <mi...@microsoft.com>
Authored: Tue May 14 17:13:14 2013 -0700
Committer: mikelid <mi...@microsoft.com>
Committed: Tue May 14 17:13:14 2013 -0700
----------------------------------------------------------------------
.../src/main/java/org/apache/tez/dag/api/DAG.java | 149 +++++++++++-
.../java/org/apache/tez/dag/api/TestDAGVerify.java | 194 +++++++++++++++
2 files changed, 340 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/645c470e/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
index 171921d..300ef31 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Stack;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
@@ -87,13 +88,155 @@ public class DAG { // FIXME rename to Topology
this.name = name;
}
- public void verify() throws TezException { // FIXME better exception
- //FIXME are task resources compulsory or will the DAG AM put in a default
- //for each vertex if not specified?
+ // AnnotatedVertex is used by verify()
+ private class AnnotatedVertex {
+ Vertex v;
+
+ int index; //for Tarjan's algorithm
+ int lowlink; //for Tarjan's algorithm
+ boolean onstack; //for Tarjan's algorithm
+
+ int inDegree;
+ int outDegree;
+
+ private AnnotatedVertex(Vertex v){
+ this.v = v;
+ index = -1;
+ lowlink = -1;
+ inDegree = 0;
+ outDegree = 0;
+ }
}
+
+ // verify()
+ //
+ // Default rules
+ // Illegal:
+ // - duplicate vertex id
+ // - cycles
+ //
+ // Ok:
+ // - orphaned vertex. Occurs in map-only
+ // - islands. Occurs if job has unrelated workflows.
+ //
+ // Not yet categorized:
+ // - orphaned vertex in DAG of >1 vertex. Could be unrelated map-only job.
+ // - v1->v2 via two edges. perhaps some self-join job would use this?
+ //
+ // "restricted" mode:
+ // In short term, the supported DAGs are limited. Call with restricted=true for these verifications.
+ // Illegal:
+ // - any vertex with more than one input or output edge. (n-ary input, n-ary merge)
+ public void verify() throws IllegalStateException {
+ verify(true);
+ }
+
+ public void verify(boolean restricted) throws IllegalStateException {
+ Map<Vertex, List<Edge>> edgeMap = new HashMap<Vertex, List<Edge>>();
+ for(Edge e : edges){
+ Vertex inputVertex = e.getInputVertex();
+ List<Edge> edgeList = edgeMap.get(inputVertex);
+ if(edgeList == null){
+ edgeList = new ArrayList<Edge>();
+ edgeMap.put(inputVertex, edgeList);
+ }
+ edgeList.add(e);
+ }
+ // check for duplicate vertex names, and prepare for cycle detection
+ Map<String, AnnotatedVertex> vertexMap = new HashMap<String, AnnotatedVertex>();
+ for(Vertex v : vertices){
+ if(vertexMap.containsKey(v.getVertexName())){
+ throw new IllegalStateException("DAG contains multiple vertices with name: " + v.getVertexName());
+ }
+ vertexMap.put(v.getVertexName(), new AnnotatedVertex(v));
+ }
+
+ detectCycles(edgeMap, vertexMap);
+
+ if(restricted){
+ for(Edge e : edges){
+ vertexMap.get(e.getInputVertex().getVertexName()).outDegree++;
+ vertexMap.get(e.getOutputVertex().getVertexName()).inDegree++;
+ }
+ for(AnnotatedVertex av: vertexMap.values()){
+ if(av.inDegree > 1){
+ throw new IllegalStateException("Vertex has inDegree>1: " + av.v.getVertexName());
+ }
+ if(av.outDegree > 1){
+ throw new IllegalStateException("Vertex has outDegree>1: " + av.v.getVertexName());
+ }
+ }
+ }
+ }
+
+ // Adaptation of Tarjan's algorithm for connected components.
+ // http://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm
+ private void detectCycles(Map<Vertex, List<Edge>> edgeMap, Map<String, AnnotatedVertex> vertexMap)
+ throws IllegalStateException{
+ Integer nextIndex = 0; // boxed integer so it is passed by reference.
+ Stack<AnnotatedVertex> stack = new Stack<DAG.AnnotatedVertex>();
+ for(AnnotatedVertex av: vertexMap.values()){
+ if(av.index == -1){
+ assert stack.empty();
+ strongConnect(av, vertexMap, edgeMap, stack, nextIndex);
+ }
+ }
+ }
+
+ // part of Tarjan's algorithm for connected components.
+ // http://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm
+ private void strongConnect(
+ AnnotatedVertex av,
+ Map<String, AnnotatedVertex> vertexMap,
+ Map<Vertex, List<Edge>> edgeMap,
+ Stack<AnnotatedVertex> stack, Integer nextIndex) throws IllegalStateException{
+ av.index = nextIndex;
+ av.lowlink = nextIndex;
+ nextIndex++;
+ stack.push(av);
+ av.onstack = true;
+
+ List<Edge> edges = edgeMap.get(av.v);
+ if(edges != null){
+ for(Edge e : edgeMap.get(av.v)){
+ AnnotatedVertex outVertex = vertexMap.get(e.getOutputVertex().getVertexName());
+ if(outVertex.index == -1){
+ strongConnect(outVertex, vertexMap, edgeMap, stack, nextIndex);
+ av.lowlink = Math.min(av.lowlink, outVertex.lowlink);
+ }
+ else if(outVertex.onstack){
+ // strongly connected component detected, but we will wait till later so that the full cycle can be displayed.
+ // update lowlink in case outputVertex should be considered the root of this component.
+ av.lowlink = Math.min(av.lowlink, outVertex.index);
+ }
+ }
+ }
+
+ if(av.lowlink == av.index ){
+ AnnotatedVertex pop = stack.pop();
+ pop.onstack = false;
+ if(pop != av){
+ // there was something on the stack other than this "av".
+ // this indicates there is a scc/cycle. It comprises all nodes from top of stack to "av"
+ StringBuilder message = new StringBuilder();
+ message.append(av.v.getVertexName() + " <- ");
+ for( ; pop != av; pop = stack.pop()){
+ message.append(pop.v.getVertexName() + " <- ");
+ pop.onstack = false;
+ }
+ message.append(av.v.getVertexName());
+ throw new IllegalStateException("DAG contains a cycle: " + message);
+ }
+ }
+ }
+
+
// create protobuf message describing DAG
public DAGPlan createDag(){
+
+ verify(true);
+
DAGPlan.Builder jobBuilder = DAGPlan.newBuilder();
jobBuilder.setName(this.name);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/645c470e/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java b/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
new file mode 100644
index 0000000..4022604
--- /dev/null
+++ b/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+import org.apache.tez.dag.api.EdgeProperty.ConnectionPattern;
+import org.apache.tez.dag.api.EdgeProperty.SourceType;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestDAGVerify {
+
+ private final String dummyProcessorClassName = TestDAGVerify.class.getName();
+ private final String dummyInputClassName = TestDAGVerify.class.getName();
+ private final String dummyOutputClassName = TestDAGVerify.class.getName();
+ private final int dummyTaskCount = 2;
+
+ // v1
+ // |
+ // v2
+ @Test
+ public void testVerify1() {
+ Vertex v1 = new Vertex("v1",dummyProcessorClassName, dummyTaskCount);
+ Vertex v2 = new Vertex("v2","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount);
+ Edge e1 = new Edge(v1, v2, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, dummyInputClassName, dummyOutputClassName));
+ DAG dag = new DAG();
+ dag.addVertex(v1);
+ dag.addVertex(v2);
+ dag.addEdge(e1);
+ dag.verify();
+ }
+
+ // v1 <----
+ // | ^
+ // v2 ^
+ // | | ^
+ // v3 v4
+ @Test
+ public void testCycle1() {
+ IllegalStateException ex=null;
+ Vertex v1 = new Vertex("v1","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount);
+ Vertex v2 = new Vertex("v2","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount);
+ Vertex v3 = new Vertex("v3","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount);
+ Vertex v4 = new Vertex("v4","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount);
+ Edge e1 = new Edge(v1, v2, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, "dummy input class", "dummy output class"));
+ Edge e2 = new Edge(v2, v3, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, "dummy input class", "dummy output class"));
+ Edge e3 = new Edge(v2, v4, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, "dummy input class", "dummy output class"));
+ Edge e4 = new Edge(v4, v1, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, "dummy input class", "dummy output class"));
+ DAG dag = new DAG();
+ dag.addVertex(v1);
+ dag.addVertex(v2);
+ dag.addVertex(v3);
+ dag.addVertex(v4);
+ dag.addEdge(e1);
+ dag.addEdge(e2);
+ dag.addEdge(e3);
+ dag.addEdge(e4);
+ try{
+ dag.verify();
+ }
+ catch (IllegalStateException e){
+ ex = e;
+ }
+ Assert.assertNotNull(ex);
+ System.out.println(ex.getMessage());
+ Assert.assertTrue(ex.getMessage().startsWith("DAG contains a cycle"));
+ }
+
+// v1
+// |
+// -> v2
+// ^ | |
+// v3 v4
+ @Test
+ public void testCycle2() {
+ IllegalStateException ex=null;
+ Vertex v1 = new Vertex("v1","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount);
+ Vertex v2 = new Vertex("v2","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount);
+ Vertex v3 = new Vertex("v3","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount);
+ Vertex v4 = new Vertex("v4","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount);
+ Edge e1 = new Edge(v1, v2, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, "dummy input class", "dummy output class"));
+ Edge e2 = new Edge(v2, v3, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, "dummy input class", "dummy output class"));
+ Edge e3 = new Edge(v2, v4, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, "dummy input class", "dummy output class"));
+ Edge e4 = new Edge(v3, v2, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, "dummy input class", "dummy output class"));
+ DAG dag = new DAG();
+ dag.addVertex(v1);
+ dag.addVertex(v2);
+ dag.addVertex(v3);
+ dag.addVertex(v4);
+ dag.addEdge(e1);
+ dag.addEdge(e2);
+ dag.addEdge(e3);
+ dag.addEdge(e4);
+ try{
+ dag.verify();
+ }
+ catch (IllegalStateException e){
+ ex = e;
+ }
+ Assert.assertNotNull(ex);
+ System.out.println(ex.getMessage());
+ Assert.assertTrue(ex.getMessage().startsWith("DAG contains a cycle"));
+ }
+
+ @Test
+ public void repeatedVertexName() {
+ IllegalStateException ex=null;
+ Vertex v1 = new Vertex("v1","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount);
+ Vertex v1repeat = new Vertex("v1","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount);
+ DAG dag = new DAG();
+ dag.addVertex(v1);
+ dag.addVertex(v1repeat);
+ try {
+ dag.verify();
+ }
+ catch (IllegalStateException e){
+ ex = e;
+ }
+ Assert.assertNotNull(ex);
+ System.out.println(ex.getMessage());
+ Assert.assertTrue(ex.getMessage().startsWith("DAG contains multiple vertices with name"));
+ }
+
+ // v1 v2
+ // | |
+ // v3
+ @Test
+ public void BinaryInput() {
+ IllegalStateException ex=null;
+ try {
+ Vertex v1 = new Vertex("v1","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount);
+ Vertex v2 = new Vertex("v2","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount);
+ Vertex v3 = new Vertex("v3","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount);
+ Edge e1 = new Edge(v1, v3, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, "dummy input class", "dummy output class"));
+ Edge e2 = new Edge(v2, v3, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, "dummy input class", "dummy output class"));
+ DAG dag = new DAG();
+ dag.addVertex(v1);
+ dag.addVertex(v2);
+ dag.addVertex(v3);
+ dag.addEdge(e1);
+ dag.addEdge(e2);
+ dag.verify();
+ }
+ catch (IllegalStateException e){
+ ex = e;
+ }
+ Assert.assertNotNull(ex);
+ System.out.println(ex.getMessage());
+ Assert.assertTrue(ex.getMessage().startsWith("Vertex has inDegree>1"));
+ }
+
+ // v1
+ // | |
+ // v2 v3
+ @Test
+ public void BinaryOutput() {
+ IllegalStateException ex=null;
+ try {
+ Vertex v1 = new Vertex("v1","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount);
+ Vertex v2 = new Vertex("v2","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount);
+ Vertex v3 = new Vertex("v3","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount);
+ Edge e1 = new Edge(v1, v2, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, "dummy input class", "dummy output class"));
+ Edge e2 = new Edge(v1, v2, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, "dummy input class", "dummy output class"));
+ DAG dag = new DAG();
+ dag.addVertex(v1);
+ dag.addVertex(v2);
+ dag.addVertex(v3);
+ dag.addEdge(e1);
+ dag.addEdge(e2);
+ dag.verify();
+ }
+ catch (IllegalStateException e){
+ ex = e;
+ }
+ Assert.assertNotNull(ex);
+ System.out.println(ex.getMessage());
+ Assert.assertTrue(ex.getMessage().startsWith("Vertex has outDegree>1"));
+ }
+}