You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/05/28 23:37:19 UTC

git commit: TEZ-800. One-one edge with parallelism -1 fails if source vertex parallelism is not -1 as well (bikas)

Repository: incubator-tez
Updated Branches:
  refs/heads/master 0ccc8e0c7 -> ae267de54


TEZ-800. One-one edge with parallelism -1 fails if source vertex parallelism is not -1 as well (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/ae267de5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/ae267de5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/ae267de5

Branch: refs/heads/master
Commit: ae267de541cc2014c770f34cf0e117c7e003b28c
Parents: 0ccc8e0
Author: Bikas Saha <bi...@apache.org>
Authored: Wed May 28 14:37:11 2014 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Wed May 28 14:37:11 2014 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/tez/dag/api/DAG.java   |  58 ++++-
 .../java/org/apache/tez/dag/api/Vertex.java     |   6 +-
 .../org/apache/tez/dag/api/TestDAGVerify.java   | 224 +++++++++++++++++++
 .../examples/BroadcastAndOneToOneExample.java   |   2 +-
 4 files changed, 286 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ae267de5/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index 4483e20..c49ac79 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -236,7 +236,59 @@ public class DAG { // FIXME rename to Topology
       }
     }
   }
-
+  
+  void checkAndInferOneToOneParallelism() {
+    // infer all 1-1 via dependencies
+    // collect all 1-1 edges where the source parallelism is set
+    Set<Vertex> newKnownTasksVertices = Sets.newHashSet();
+    for (Vertex vertex : vertices.values()) {
+      if (vertex.getParallelism() > -1) {
+        newKnownTasksVertices.add(vertex);
+      }
+    }
+    
+    // walk through all known source 1-1 edges and infer parallelism
+    // add newly inferred vertices for consideration as known sources
+    // the outer loop will run for every new level of inferring the parallelism
+    // however, the entire logic will process each vertex only once
+    while(!newKnownTasksVertices.isEmpty()) {
+      Set<Vertex> knownTasksVertices = Sets.newHashSet(newKnownTasksVertices);
+      newKnownTasksVertices.clear();
+      for (Vertex v : knownTasksVertices) {
+        for (Edge e : v.getOutputEdges()) {
+          if (e.getEdgeProperty().getDataMovementType() == DataMovementType.ONE_TO_ONE) {
+            Vertex outVertex = e.getOutputVertex();
+            if (outVertex.getParallelism() == -1) {
+              LOG.info("Inferring parallelism for vertex: "
+                  + outVertex.getVertexName() + " to be " + v.getParallelism()
+                  + " from 1-1 connection with vertex " + v.getVertexName());
+              outVertex.setParallelism(v.getParallelism());
+              newKnownTasksVertices.add(outVertex);
+            }
+          }
+        }
+      }
+    }
+    
+    // check for inconsistency and errors
+    for (Edge e : edges) {
+      Vertex inputVertex = e.getInputVertex();
+      Vertex outputVertex = e.getOutputVertex();
+      
+      if (e.getEdgeProperty().getDataMovementType() == DataMovementType.ONE_TO_ONE) {
+        if (inputVertex.getParallelism() != outputVertex.getParallelism()) {
+          // both should be equal or equal to -1.
+          if (outputVertex.getParallelism() != -1) {
+            throw new TezUncheckedException(
+                "1-1 Edge. Destination vertex parallelism must match source vertex. "
+                + "Vertex: " + inputVertex.getVertexName() + " does not match vertex: " 
+                + outputVertex.getVertexName());
+          }
+        }
+      }
+    }
+  }
+  
   // AnnotatedVertex is used by verify()
   private static class AnnotatedVertex {
     Vertex v;
@@ -300,7 +352,7 @@ public class DAG { // FIXME rename to Topology
     for (Edge e : edges) {
       // Construct structure for cycle detection
       Vertex inputVertex = e.getInputVertex();
-      Vertex outputVertex = e.getOutputVertex();
+      Vertex outputVertex = e.getOutputVertex();      
       List<Edge> edgeList = edgeMap.get(inputVertex);
       if (edgeList == null) {
         edgeList = new ArrayList<Edge>();
@@ -378,6 +430,8 @@ public class DAG { // FIXME rename to Topology
     // within the addInput / addOutput call itself.
 
     detectCycles(edgeMap, vertexMap);
+    
+    checkAndInferOneToOneParallelism();
 
     if (restricted) {
       for (Edge e : edges) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ae267de5/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
index b05a63c..1be75a2 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
@@ -41,7 +41,7 @@ public class Vertex {
   private final String vertexName;
   private final ProcessorDescriptor processorDescriptor;
 
-  private final int parallelism;
+  private int parallelism;
   private VertexLocationHint taskLocationsHint;
   private final Resource taskResource;
   private Map<String, LocalResource> taskLocalResources = new HashMap<String, LocalResource>();
@@ -89,6 +89,10 @@ public class Vertex {
   public int getParallelism() {
     return parallelism;
   }
+  
+  void setParallelism(int parallelism) {
+    this.parallelism = parallelism;
+  }
 
   public Resource getTaskResource() {
     return taskResource;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ae267de5/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
index 7d55c39..4b6dfc6 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
@@ -97,6 +97,179 @@ public class TestDAGVerify {
     dag.verify();
   }
   
+  @Test
+  // v1 (known) -> v2 (-1) -> v3 (-1)
+  public void testVerifyOneToOneInferParallelism() {
+    Vertex v1 = new Vertex("v1",
+        new ProcessorDescriptor(dummyProcessorClassName),
+        dummyTaskCount, dummyTaskResource);
+    Vertex v2 = new Vertex("v2",
+        new ProcessorDescriptor("MapProcessor"),
+        -1, dummyTaskResource);
+    Vertex v3 = new Vertex("v3",
+        new ProcessorDescriptor("MapProcessor"),
+        -1, dummyTaskResource);
+    Edge e1 = new Edge(v1, v2,
+        new EdgeProperty(DataMovementType.ONE_TO_ONE, 
+            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, 
+            new OutputDescriptor(dummyOutputClassName),
+            new InputDescriptor(dummyInputClassName)));
+    Edge e2 = new Edge(v2, v3,
+        new EdgeProperty(DataMovementType.ONE_TO_ONE, 
+            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, 
+            new OutputDescriptor(dummyOutputClassName),
+            new InputDescriptor(dummyInputClassName)));
+    DAG dag = new DAG("testDag");
+    dag.addVertex(v1);
+    dag.addVertex(v2);
+    dag.addVertex(v3);
+    dag.addEdge(e1);
+    dag.addEdge(e2);
+    dag.verify();
+    Assert.assertEquals(dummyTaskCount, v2.getParallelism());
+    Assert.assertEquals(dummyTaskCount, v3.getParallelism());
+  }
+  
+  @Test
+  // v1 (known) -> v2 (-1) -> v3 (-1)
+  // The test checks resiliency to ordering of the vertices/edges
+  public void testVerifyOneToOneInferParallelismReverseOrder() {
+    Vertex v1 = new Vertex("v1",
+        new ProcessorDescriptor(dummyProcessorClassName),
+        dummyTaskCount, dummyTaskResource);
+    Vertex v2 = new Vertex("v2",
+        new ProcessorDescriptor("MapProcessor"),
+        -1, dummyTaskResource);
+    Vertex v3 = new Vertex("v3",
+        new ProcessorDescriptor("MapProcessor"),
+        -1, dummyTaskResource);
+    Edge e1 = new Edge(v1, v2,
+        new EdgeProperty(DataMovementType.ONE_TO_ONE, 
+            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, 
+            new OutputDescriptor(dummyOutputClassName),
+            new InputDescriptor(dummyInputClassName)));
+    Edge e2 = new Edge(v2, v3,
+        new EdgeProperty(DataMovementType.ONE_TO_ONE, 
+            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, 
+            new OutputDescriptor(dummyOutputClassName),
+            new InputDescriptor(dummyInputClassName)));
+    DAG dag = new DAG("testDag");
+    dag.addVertex(v3);
+    dag.addVertex(v1);
+    dag.addVertex(v2);
+    dag.addEdge(e2);
+    dag.addEdge(e1);
+    dag.verify();
+    Assert.assertEquals(dummyTaskCount, v2.getParallelism());
+    Assert.assertEquals(dummyTaskCount, v3.getParallelism());
+  }
+  
+  @Test  
+  public void testVerifyOneToOneNoInferParallelism() {
+    Vertex v1 = new Vertex("v1",
+        new ProcessorDescriptor(dummyProcessorClassName),
+        -1, dummyTaskResource);
+    Vertex v2 = new Vertex("v2",
+        new ProcessorDescriptor("MapProcessor"),
+        -1, dummyTaskResource);
+    Edge e1 = new Edge(v1, v2,
+        new EdgeProperty(DataMovementType.ONE_TO_ONE, 
+            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, 
+            new OutputDescriptor(dummyOutputClassName),
+            new InputDescriptor(dummyInputClassName)));
+    DAG dag = new DAG("testDag");
+    dag.addVertex(v1);
+    dag.addVertex(v2);
+    dag.addEdge(e1);
+    dag.verify();
+    Assert.assertEquals(-1, v2.getParallelism());
+  }
+  
+  @Test  
+  // v1 (-1) -> v2 (known) -> v3 (-1)
+  public void testVerifyOneToOneIncorrectParallelism1() {
+    Vertex v1 = new Vertex("v1",
+        new ProcessorDescriptor(dummyProcessorClassName),
+        -1, dummyTaskResource);
+    Vertex v2 = new Vertex("v2",
+        new ProcessorDescriptor(dummyProcessorClassName),
+        dummyTaskCount, dummyTaskResource);
+    Vertex v3 = new Vertex("v3",
+        new ProcessorDescriptor("MapProcessor"),
+        -1, dummyTaskResource);
+    Edge e1 = new Edge(v1, v3,
+        new EdgeProperty(DataMovementType.ONE_TO_ONE, 
+            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, 
+            new OutputDescriptor(dummyOutputClassName),
+            new InputDescriptor(dummyInputClassName)));
+    Edge e2 = new Edge(v2, v3,
+        new EdgeProperty(DataMovementType.ONE_TO_ONE, 
+            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, 
+            new OutputDescriptor(dummyOutputClassName),
+            new InputDescriptor(dummyInputClassName)));
+    DAG dag = new DAG("testDag");
+    dag.addVertex(v1);
+    dag.addVertex(v2);
+    dag.addVertex(v3);
+    dag.addEdge(e1);
+    dag.addEdge(e2);
+    try {
+      dag.verify();
+      Assert.assertTrue(false);
+    } catch (TezUncheckedException e) {
+      Assert.assertTrue(e.getMessage().contains(
+          "1-1 Edge. Destination vertex parallelism must match source vertex"));
+    }
+  }
+
+  @Test
+  // v1 (-1) -> v3 (-1), v2 (known) -> v3 (-1)
+  // order of edges should not matter
+  public void testVerifyOneToOneIncorrectParallelism2() {
+    Vertex v1 = new Vertex("v1",
+        new ProcessorDescriptor(dummyProcessorClassName),
+        -1, dummyTaskResource);
+    Vertex v2 = new Vertex("v2",
+        new ProcessorDescriptor(dummyProcessorClassName),
+        dummyTaskCount, dummyTaskResource);
+    Vertex v3 = new Vertex("v3",
+        new ProcessorDescriptor(dummyProcessorClassName),
+        -1, dummyTaskResource);
+    Vertex v4 = new Vertex("v4",
+        new ProcessorDescriptor(dummyProcessorClassName),
+        -1, dummyTaskResource);
+    Edge e1 = new Edge(v1, v4,
+        new EdgeProperty(DataMovementType.ONE_TO_ONE, 
+            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, 
+            new OutputDescriptor(dummyOutputClassName),
+            new InputDescriptor(dummyInputClassName)));
+    Edge e2 = new Edge(v2, v4,
+        new EdgeProperty(DataMovementType.ONE_TO_ONE, 
+            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, 
+            new OutputDescriptor(dummyOutputClassName),
+            new InputDescriptor(dummyInputClassName)));
+    Edge e3 = new Edge(v3, v4,
+        new EdgeProperty(DataMovementType.ONE_TO_ONE, 
+            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, 
+            new OutputDescriptor(dummyOutputClassName),
+            new InputDescriptor(dummyInputClassName)));
+    DAG dag = new DAG("testDag");
+    dag.addVertex(v1);
+    dag.addVertex(v2);
+    dag.addVertex(v3);
+    dag.addVertex(v4);
+    dag.addEdge(e1);
+    dag.addEdge(e2);
+    dag.addEdge(e3);
+    try {
+      dag.verify();
+      Assert.assertTrue(false);
+    } catch (TezUncheckedException e) {
+      Assert.assertTrue(e.getMessage().contains(
+          "1-1 Edge. Destination vertex parallelism must match source vertex"));
+    }
+  }
+  
   @Test  
   public void testVerifyBroadcast() {
     Vertex v1 = new Vertex("v1",
@@ -544,6 +717,57 @@ public class TestDAGVerify {
     Assert.assertTrue(v5.getGroupInputs().containsKey(groupName2));
     Assert.assertEquals(2, dag.vertexGroups.size());
   }
+  
+  @Test
+  public void testVertexGroupOneToOne() {
+    Vertex v1 = new Vertex("v1",
+        new ProcessorDescriptor("Processor"),
+        dummyTaskCount, dummyTaskResource);
+    Vertex v2 = new Vertex("v2",
+        new ProcessorDescriptor("Processor"),
+        dummyTaskCount, dummyTaskResource);
+    Vertex v3 = new Vertex("v3",
+        new ProcessorDescriptor("Processor"),
+        dummyTaskCount, dummyTaskResource);
+    Vertex v4 = new Vertex("v4",
+        new ProcessorDescriptor("Processor"),
+        dummyTaskCount, dummyTaskResource);
+    Vertex v5 = new Vertex("v5",
+        new ProcessorDescriptor("Processor"),
+        -1, dummyTaskResource);
+    
+    DAG dag = new DAG("testDag");
+    String groupName1 = "uv12";
+    VertexGroup uv12 = dag.createVertexGroup(groupName1, v1, v2);
+    OutputDescriptor outDesc = new OutputDescriptor();
+    uv12.addOutput("uvOut", outDesc, null);
+    
+    String groupName2 = "uv23";
+    VertexGroup uv23 = dag.createVertexGroup(groupName2, v2, v3);
+    
+    GroupInputEdge e1 = new GroupInputEdge(uv12, v4,
+        new EdgeProperty(DataMovementType.ONE_TO_ONE, 
+            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+            new OutputDescriptor("dummy output class"),
+            new InputDescriptor("dummy input class")),
+            new InputDescriptor("dummy input class"));
+    GroupInputEdge e2 = new GroupInputEdge(uv23, v5,
+        new EdgeProperty(DataMovementType.ONE_TO_ONE, 
+            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+            new OutputDescriptor("dummy output class"),
+            new InputDescriptor("dummy input class")),
+            new InputDescriptor("dummy input class"));
+    
+    dag.addVertex(v1);
+    dag.addVertex(v2);
+    dag.addVertex(v3);
+    dag.addVertex(v4);
+    dag.addVertex(v5);
+    dag.addEdge(e1);
+    dag.addEdge(e2);
+    dag.verify();
+    Assert.assertEquals(dummyTaskCount, v5.getParallelism());
+  }
 
   //   v1
   //  |  |

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ae267de5/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
index 0cd6aaf..3bb4223 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
@@ -183,7 +183,7 @@ public class BroadcastAndOneToOneExample extends Configured implements Tool {
     Vertex oneToOneVertex = new Vertex("OneToOne",
         new ProcessorDescriptor(
             OneToOneProcessor.class.getName()).setUserPayload(procPayload),
-            numOneToOneTasks, MRHelpers.getReduceResource(kvOneToOneConf));
+            -1, MRHelpers.getReduceResource(kvOneToOneConf));
     oneToOneVertex.setJavaOpts(
         MRHelpers.getReduceJavaOpts(kvOneToOneConf)).setVertexManagerPlugin(
             new VertexManagerPluginDescriptor(InputReadyVertexManager.class.getName()));