You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/02/04 20:13:40 UTC

tez git commit: TEZ-2015. VertexImpl.doneReconfiguringVertex() should check other criteria before sending notification (bikas) (cherry picked from commit 7096d8a8f409e3db4f568230a959432e2c9cbb78)

Repository: tez
Updated Branches:
  refs/heads/branch-0.6 8cdd988b2 -> fbd240f4e


TEZ-2015. VertexImpl.doneReconfiguringVertex() should check other criteria before sending notification (bikas)
(cherry picked from commit 7096d8a8f409e3db4f568230a959432e2c9cbb78)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/fbd240f4
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/fbd240f4
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/fbd240f4

Branch: refs/heads/branch-0.6
Commit: fbd240f4e08cd47f842a2dd98106835ff5f1954d
Parents: 8cdd988
Author: Bikas Saha <bi...@apache.org>
Authored: Wed Feb 4 11:10:23 2015 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Wed Feb 4 11:12:47 2015 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 12 ++--
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 72 ++++++++++++++++++++
 3 files changed, 80 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/fbd240f4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 796ec9a..d369d38 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -125,6 +125,8 @@ TEZ-UI CHANGES (TEZ-8):
 Release 0.5.4: Unreleased
 
 ALL CHANGES:
+  TEZ-2015. VertexImpl.doneReconfiguringVertex() should check other criteria
+  before sending notification
   TEZ-2011. InputReadyVertexManager not resilient to updates in parallelism 
   TEZ-1934. TestAMRecovery may fail due to the execution order is not determined.
   TEZ-1642. TestAMRecovery sometimes fail.

http://git-wip-us.apache.org/repos/asf/tez/blob/fbd240f4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index a467e4b..506bbc1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1560,11 +1560,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       Preconditions.checkState(vertexToBeReconfiguredByManager, "doneReconfiguringVertex() can be "
           + "invoked only after vertexReconfigurationPlanned() is invoked");
       this.vertexToBeReconfiguredByManager = false;
-      // TEZ-2015 VM may not have configured everything eg. input edge. maybeSendConfiguredEvent()
-      if (completelyConfiguredSent.compareAndSet(false, true)) {
-        // vertex already started and at that time this event was not sent. Send now.
-        stateChangeNotifier.stateChanged(vertexId, new VertexStateUpdate(vertexName,
-            org.apache.tez.dag.api.event.VertexState.CONFIGURED));
+      if (canInitVertex()) {
+        maybeSendConfiguredEvent();
+      } else {
+        Preconditions.checkState(getInternalState() == VertexState.INITIALIZING, "Vertex: "
+            + getLogIdentifier());
       }
     } finally {
       writeLock.unlock();
@@ -3298,7 +3298,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   private void maybeSendConfiguredEvent() {
     // the vertex is fully configured by the time it starts. Always notify completely configured
     // unless the vertex manager has told us that it is going to reconfigure it further
-    Preconditions.checkState(canInitVertex());
+    Preconditions.checkState(canInitVertex(), "Vertex: " + getLogIdentifier());
     if (!this.vertexToBeReconfiguredByManager) {
       // this vertex will not be reconfigured by its manager
       if (completelyConfiguredSent.compareAndSet(false, true)) {

http://git-wip-us.apache.org/repos/asf/tez/blob/fbd240f4/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 645708e..e94bb17 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -4949,6 +4949,78 @@ public class TestVertexImpl {
     Assert.assertNotNull(vB.getTask(0));
     Assert.assertNotNull(vC.getTask(0));
   }
+  
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
+  public void testVertexConfiguredDoneByVMBeforeEdgeDefined() throws Exception {
+    // Race when a source vertex manages to start before the target vertex has
+    // been initialized
+    setupPreDagCreation();
+    dagPlan = createSamplerDAGPlan(true);
+    setupPostDagCreation();
+    
+    VertexImpl vA = vertices.get("A");
+    VertexImpl vB = vertices.get("B");
+    VertexImpl vC = vertices.get("C");
+
+    TestUpdateListener listener = new TestUpdateListener();
+    updateTracker
+        .registerForVertexUpdates(vB.getName(),
+            EnumSet.of(org.apache.tez.dag.api.event.VertexState.CONFIGURED),
+            listener);
+
+    // fudge the vm so we can do custom stuff
+    vB.vertexManager = new VertexManager(
+        VertexManagerPluginDescriptor.create(VertexManagerPluginForTest.class.getName()),
+        vB, appContext, mock(StateChangeNotifier.class));
+    
+    vB.vertexReconfigurationPlanned();
+    
+    dispatcher.getEventHandler().handle(new VertexEvent(vA.getVertexId(),
+      VertexEventType.V_INIT));
+    dispatcher.getEventHandler().handle(new VertexEvent(vA.getVertexId(),
+      VertexEventType.V_START));
+
+    dispatcher.await();
+    Assert.assertEquals(VertexState.INITIALIZING, vA.getState());
+    Assert.assertEquals(VertexState.INITIALIZING, vB.getState());
+    Assert.assertEquals(VertexState.INITIALIZING, vC.getState());
+    
+    // setting the edge manager should vA to start
+    EdgeManagerPluginDescriptor mockEdgeManagerDescriptor =
+        EdgeManagerPluginDescriptor.create(EdgeManagerForTest.class.getName());
+    Edge e = vC.sourceVertices.get(vA);
+    Assert.assertNull(e.getEdgeManager());
+    e.setCustomEdgeManager(mockEdgeManagerDescriptor);
+    dispatcher.await();
+    Assert.assertEquals(VertexState.RUNNING, vA.getState());
+    Assert.assertEquals(VertexState.INITIALIZING, vB.getState());
+    Assert.assertEquals(VertexState.INITIALIZING, vC.getState());
+    
+    // vB is not configured yet. Edge to C is not configured. So it should not send configured event
+    // even thought VM says its doneConfiguring vertex
+    vB.doneReconfiguringVertex();
+    Assert.assertEquals(0, listener.events.size());
+    
+    // complete configuration and verify getting configured signal from vB
+    Map<String, EdgeManagerPluginDescriptor> edges = Maps.newHashMap();
+    edges.put("B", mockEdgeManagerDescriptor);
+    vC.setParallelism(2, vertexLocationHint, edges, null, true);
+
+    dispatcher.await();
+    Assert.assertEquals(1, listener.events.size());
+    Assert.assertEquals(vB.getName(), listener.events.get(0).getVertexName());
+    Assert.assertEquals(org.apache.tez.dag.api.event.VertexState.CONFIGURED,
+        listener.events.get(0).getVertexState());
+    updateTracker.unregisterForVertexUpdates(vB.getName(), listener);
+    
+    Assert.assertEquals(VertexState.RUNNING, vA.getState());
+    Assert.assertEquals(VertexState.RUNNING, vB.getState());
+    Assert.assertEquals(VertexState.RUNNING, vC.getState());
+    Assert.assertNotNull(vA.getTask(0));
+    Assert.assertNotNull(vB.getTask(0));
+    Assert.assertNotNull(vC.getTask(0));
+  }
 
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)