You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/05/29 20:37:33 UTC
git commit: TEZ-1143 (addendum). 1-1 source split event should be
handled in Vertex.RUNNING and Vertex.INITED state (bikas)
Repository: incubator-tez
Updated Branches:
refs/heads/master acd0a46e3 -> f96a53d2d
TEZ-1143 (addendum). 1-1 source split event should be handled in Vertex.RUNNING and Vertex.INITED state (bikas)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/f96a53d2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/f96a53d2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/f96a53d2
Branch: refs/heads/master
Commit: f96a53d2d423be1d0e55a0a422a55b7410f024c6
Parents: acd0a46
Author: Bikas Saha <bi...@apache.org>
Authored: Thu May 29 11:37:24 2014 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Thu May 29 11:37:24 2014 -0700
----------------------------------------------------------------------
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 5 ++-
.../tez/dag/app/dag/impl/TestVertexImpl.java | 39 +++++++++++++++++++-
2 files changed, 41 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f96a53d2/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 1cf63cb..0b46fbb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -2621,8 +2621,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
" numTasks " + splitEvent.getNumTasks());
vertex.originalOneToOneSplitSource = originalSplitSource;
vertex.setParallelism(splitEvent.getNumTasks(), null, null);
- if (vertex.getState() == VertexState.RUNNING) {
- return VertexState.RUNNING;
+ if (vertex.getState() == VertexState.RUNNING ||
+ vertex.getState() == VertexState.INITED) {
+ return vertex.getState();
} else {
Preconditions.checkState(vertex.getState() == VertexState.INITIALIZING,
" Unexpected 1-1 split for vertex " + vertex.getVertexId() +
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f96a53d2/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 05c6569..663e23c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -2234,7 +2234,6 @@ public class TestVertexImpl {
Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex2").getState());
Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex3").getState());
Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex4").getState());
- System.out.println("xxx");
// change parallelism
int newNumTasks = 3;
v1.setParallelism(newNumTasks, null, null);
@@ -2247,6 +2246,44 @@ public class TestVertexImpl {
Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex3").getState());
Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex4").getState());
}
+
+ @Test(timeout = 5000)
+ public void testVertexWithOneToOneSplitWhileInited() {
+ int numTasks = 5;
+ // create a diamond shaped dag with 1-1 edges.
+ setupPreDagCreation();
+ dagPlan = createDAGPlanForOneToOneSplit(null, numTasks);
+ setupPostDagCreation();
+ VertexImpl v1 = vertices.get("vertex1");
+ initAllVertices(VertexState.INITED);
+
+ // fudge vertex manager so that tasks dont start running
+ v1.vertexManager = new VertexManager(new VertexManagerPluginForTest(),
+ v1, appContext);
+
+ Assert.assertEquals(numTasks, vertices.get("vertex2").getTotalTasks());
+ Assert.assertEquals(numTasks, vertices.get("vertex3").getTotalTasks());
+ Assert.assertEquals(numTasks, vertices.get("vertex4").getTotalTasks());
+ // change parallelism
+ int newNumTasks = 3;
+ v1.setParallelism(newNumTasks, null, null);
+ dispatcher.await();
+ Assert.assertEquals(newNumTasks, vertices.get("vertex2").getTotalTasks());
+ Assert.assertEquals(newNumTasks, vertices.get("vertex3").getTotalTasks());
+ Assert.assertEquals(newNumTasks, vertices.get("vertex4").getTotalTasks());
+ Assert.assertEquals(VertexState.INITED, vertices.get("vertex1").getState());
+ Assert.assertEquals(VertexState.INITED, vertices.get("vertex2").getState());
+ Assert.assertEquals(VertexState.INITED, vertices.get("vertex3").getState());
+ Assert.assertEquals(VertexState.INITED, vertices.get("vertex4").getState());
+
+ startVertex(v1);
+ dispatcher.await();
+
+ Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex1").getState());
+ Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex2").getState());
+ Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex3").getState());
+ Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex4").getState());
+ }
@Test(timeout = 5000)
public void testHistoryEventGeneration() {