You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/05/29 20:37:33 UTC

git commit: TEZ-1143 (addendum). 1-1 source split event should be handled in Vertex.RUNNING and Vertex.INITED state (bikas)

Repository: incubator-tez
Updated Branches:
  refs/heads/master acd0a46e3 -> f96a53d2d


TEZ-1143 (addendum). 1-1 source split event should be handled in Vertex.RUNNING and Vertex.INITED state (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/f96a53d2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/f96a53d2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/f96a53d2

Branch: refs/heads/master
Commit: f96a53d2d423be1d0e55a0a422a55b7410f024c6
Parents: acd0a46
Author: Bikas Saha <bi...@apache.org>
Authored: Thu May 29 11:37:24 2014 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Thu May 29 11:37:24 2014 -0700

----------------------------------------------------------------------
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  5 ++-
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 39 +++++++++++++++++++-
 2 files changed, 41 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f96a53d2/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 1cf63cb..0b46fbb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -2621,8 +2621,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           " numTasks " + splitEvent.getNumTasks());
       vertex.originalOneToOneSplitSource = originalSplitSource;
       vertex.setParallelism(splitEvent.getNumTasks(), null, null);
-      if (vertex.getState() == VertexState.RUNNING) {
-        return VertexState.RUNNING;
+      if (vertex.getState() == VertexState.RUNNING || 
+          vertex.getState() == VertexState.INITED) {
+        return vertex.getState();
       } else {
         Preconditions.checkState(vertex.getState() == VertexState.INITIALIZING,
             " Unexpected 1-1 split for vertex " + vertex.getVertexId() +

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f96a53d2/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 05c6569..663e23c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -2234,7 +2234,6 @@ public class TestVertexImpl {
     Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex2").getState());
     Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex3").getState());
     Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex4").getState());
-    System.out.println("xxx");
     // change parallelism
     int newNumTasks = 3;
     v1.setParallelism(newNumTasks, null, null);
@@ -2247,6 +2246,44 @@ public class TestVertexImpl {
     Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex3").getState());
     Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex4").getState());
   }
+  
+  @Test(timeout = 5000)
+  public void testVertexWithOneToOneSplitWhileInited() {
+    int numTasks = 5;
+    // create a diamond shaped dag with 1-1 edges. 
+    setupPreDagCreation();
+    dagPlan = createDAGPlanForOneToOneSplit(null, numTasks);
+    setupPostDagCreation();
+    VertexImpl v1 = vertices.get("vertex1");
+    initAllVertices(VertexState.INITED);
+    
+    // fudge vertex manager so that tasks dont start running
+    v1.vertexManager = new VertexManager(new VertexManagerPluginForTest(),
+        v1, appContext);
+    
+    Assert.assertEquals(numTasks, vertices.get("vertex2").getTotalTasks());
+    Assert.assertEquals(numTasks, vertices.get("vertex3").getTotalTasks());
+    Assert.assertEquals(numTasks, vertices.get("vertex4").getTotalTasks());
+    // change parallelism
+    int newNumTasks = 3;
+    v1.setParallelism(newNumTasks, null, null);
+    dispatcher.await();
+    Assert.assertEquals(newNumTasks, vertices.get("vertex2").getTotalTasks());
+    Assert.assertEquals(newNumTasks, vertices.get("vertex3").getTotalTasks());
+    Assert.assertEquals(newNumTasks, vertices.get("vertex4").getTotalTasks());
+    Assert.assertEquals(VertexState.INITED, vertices.get("vertex1").getState());
+    Assert.assertEquals(VertexState.INITED, vertices.get("vertex2").getState());
+    Assert.assertEquals(VertexState.INITED, vertices.get("vertex3").getState());
+    Assert.assertEquals(VertexState.INITED, vertices.get("vertex4").getState());
+
+    startVertex(v1);
+    dispatcher.await();
+
+    Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex1").getState());
+    Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex2").getState());
+    Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex3").getState());
+    Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex4").getState());
+  }
 
   @Test(timeout = 5000)
   public void testHistoryEventGeneration() {