You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zj...@apache.org on 2015/06/23 04:27:16 UTC

tez git commit: TEZ-2568. V_INPUT_DATA_INFORMATION may happen after vertex is initialized (zjffdu)

Repository: tez
Updated Branches:
  refs/heads/master 4b29ece20 -> 142bd428b


TEZ-2568. V_INPUT_DATA_INFORMATION may happen after vertex is initialized (zjffdu)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/142bd428
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/142bd428
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/142bd428

Branch: refs/heads/master
Commit: 142bd428b44019a5199bec82de5d3d017a5489e7
Parents: 4b29ece
Author: Jeff Zhang <zj...@apache.org>
Authored: Tue Jun 23 10:11:34 2015 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Tue Jun 23 10:11:34 2015 +0800

----------------------------------------------------------------------
 CHANGES.txt                                                  | 1 +
 .../java/org/apache/tez/dag/app/dag/impl/VertexImpl.java     | 8 ++++++--
 .../java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java | 6 ++++--
 3 files changed, 11 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/142bd428/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 25a2450..ac8d8d6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -41,6 +41,7 @@ Release 0.7.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2568. V_INPUT_DATA_INFORMATION may happen after vertex is initialized
   TEZ-2291. TEZ UI: Improper vertex name in tables.
   TEZ-2567. Tez UI: download dag data does not work within ambari
   TEZ-2559. tez-ui fails compilation due to version dependency of frontend-maven-plugin

http://git-wip-us.apache.org/repos/asf/tez/blob/142bd428/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index e909c9f..aa8f593 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -728,6 +728,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
   private final List<OutputSpec> additionalOutputSpecs = new ArrayList<OutputSpec>();
   private Set<String> inputsWithInitializers;
   private int numInitializedInputs;
+  @VisibleForTesting
+  int numInitializerCompletionsHandled = 0;
   private boolean startSignalPending = false;
   // We may always store task events in the vertex for scalability
   List<TezEvent> pendingTaskEvents = Lists.newLinkedList();
@@ -3359,6 +3361,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
 
     @Override
     public VertexState transition(VertexImpl vertex, VertexEvent event) {
+      vertex.numInitializerCompletionsHandled++;
       VertexEventInputDataInformation iEvent = (VertexEventInputDataInformation) event;
       List<TezEvent> inputInfoEvents = iEvent.getEvents();
       try {
@@ -3375,8 +3378,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
 
       // done. check if we need to do the initialization
       if (vertex.getState() == VertexState.INITIALIZING && vertex.initWaitsForRootInitializers) {
-        if (vertex.numInitializedInputs == vertex.inputsWithInitializers.size()) {
-          // set the wait flag to false if all initializers are done
+        if (vertex.numInitializedInputs == vertex.inputsWithInitializers.size()
+            && vertex.numInitializerCompletionsHandled == vertex.inputsWithInitializers.size()) {
+          // set the wait flag to false if all initializers are done and InputDataInformation are received from VM
           vertex.initWaitsForRootInitializers = false;
         }
         // initialize vertex if possible and needed

http://git-wip-us.apache.org/repos/asf/tez/blob/142bd428/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index aeea407..8b2a1b4 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -4680,11 +4680,12 @@ public class TestVertexImpl {
     initializerManager1.completeInputInitialization(0, 5, v1Hints);
     dispatcher.await();
     Assert.assertEquals(VertexState.INITIALIZING, v1.getState());
-
+    Assert.assertEquals(1, v1.numInitializerCompletionsHandled);
     // Complete second initializer
     initializerManager1.completeInputInitialization(1);
     dispatcher.await();
     Assert.assertEquals(VertexState.INITED, v1.getState());
+    Assert.assertEquals(2, v1.numInitializerCompletionsHandled);
   }
 
   @Test(timeout = 5000)
@@ -4709,11 +4710,12 @@ public class TestVertexImpl {
     initializerManager1.completeInputInitialization(1);
     dispatcher.await();
     Assert.assertEquals(VertexState.INITIALIZING, v1.getState());
-
+    Assert.assertEquals(1, v1.numInitializerCompletionsHandled);
     // Complete second initializer which sets parallelism
     initializerManager1.completeInputInitialization(0, 5, v1Hints);
     dispatcher.await();
     Assert.assertEquals(VertexState.INITED, v1.getState());
+    Assert.assertEquals(2, v1.numInitializerCompletionsHandled);
   }
 
   @Test(timeout = 500000)