You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zj...@apache.org on 2015/06/23 04:27:16 UTC
tez git commit: TEZ-2568. V_INPUT_DATA_INFORMATION may happen after
vertex is initialized (zjffdu)
Repository: tez
Updated Branches:
refs/heads/master 4b29ece20 -> 142bd428b
TEZ-2568. V_INPUT_DATA_INFORMATION may happen after vertex is initialized (zjffdu)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/142bd428
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/142bd428
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/142bd428
Branch: refs/heads/master
Commit: 142bd428b44019a5199bec82de5d3d017a5489e7
Parents: 4b29ece
Author: Jeff Zhang <zj...@apache.org>
Authored: Tue Jun 23 10:11:34 2015 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Tue Jun 23 10:11:34 2015 +0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../java/org/apache/tez/dag/app/dag/impl/VertexImpl.java | 8 ++++++--
.../java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java | 6 ++++--
3 files changed, 11 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/142bd428/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 25a2450..ac8d8d6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -41,6 +41,7 @@ Release 0.7.1: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2568. V_INPUT_DATA_INFORMATION may happen after vertex is initialized
TEZ-2291. TEZ UI: Improper vertex name in tables.
TEZ-2567. Tez UI: download dag data does not work within ambari
TEZ-2559. tez-ui fails compilation due to version dependency of frontend-maven-plugin
http://git-wip-us.apache.org/repos/asf/tez/blob/142bd428/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index e909c9f..aa8f593 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -728,6 +728,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
private final List<OutputSpec> additionalOutputSpecs = new ArrayList<OutputSpec>();
private Set<String> inputsWithInitializers;
private int numInitializedInputs;
+ @VisibleForTesting
+ int numInitializerCompletionsHandled = 0;
private boolean startSignalPending = false;
// We may always store task events in the vertex for scalability
List<TezEvent> pendingTaskEvents = Lists.newLinkedList();
@@ -3359,6 +3361,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
@Override
public VertexState transition(VertexImpl vertex, VertexEvent event) {
+ vertex.numInitializerCompletionsHandled++;
VertexEventInputDataInformation iEvent = (VertexEventInputDataInformation) event;
List<TezEvent> inputInfoEvents = iEvent.getEvents();
try {
@@ -3375,8 +3378,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
// done. check if we need to do the initialization
if (vertex.getState() == VertexState.INITIALIZING && vertex.initWaitsForRootInitializers) {
- if (vertex.numInitializedInputs == vertex.inputsWithInitializers.size()) {
- // set the wait flag to false if all initializers are done
+ if (vertex.numInitializedInputs == vertex.inputsWithInitializers.size()
+ && vertex.numInitializerCompletionsHandled == vertex.inputsWithInitializers.size()) {
+ // set the wait flag to false if all initializers are done and InputDataInformation are received from VM
vertex.initWaitsForRootInitializers = false;
}
// initialize vertex if possible and needed
http://git-wip-us.apache.org/repos/asf/tez/blob/142bd428/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index aeea407..8b2a1b4 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -4680,11 +4680,12 @@ public class TestVertexImpl {
initializerManager1.completeInputInitialization(0, 5, v1Hints);
dispatcher.await();
Assert.assertEquals(VertexState.INITIALIZING, v1.getState());
-
+ Assert.assertEquals(1, v1.numInitializerCompletionsHandled);
// Complete second initializer
initializerManager1.completeInputInitialization(1);
dispatcher.await();
Assert.assertEquals(VertexState.INITED, v1.getState());
+ Assert.assertEquals(2, v1.numInitializerCompletionsHandled);
}
@Test(timeout = 5000)
@@ -4709,11 +4710,12 @@ public class TestVertexImpl {
initializerManager1.completeInputInitialization(1);
dispatcher.await();
Assert.assertEquals(VertexState.INITIALIZING, v1.getState());
-
+ Assert.assertEquals(1, v1.numInitializerCompletionsHandled);
// Complete second initializer which sets parallelism
initializerManager1.completeInputInitialization(0, 5, v1Hints);
dispatcher.await();
Assert.assertEquals(VertexState.INITED, v1.getState());
+ Assert.assertEquals(2, v1.numInitializerCompletionsHandled);
}
@Test(timeout = 500000)