You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ab...@apache.org on 2020/05/12 08:31:43 UTC

[tez] branch master updated: TEZ-4173: isSetParallelismCalled should be checked before skipping vertex reinit (Syed Shameerur Rahman via László Bodor)

This is an automated email from the ASF dual-hosted git repository.

abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/master by this push:
     new bc6a961  TEZ-4173: isSetParallelismCalled should be checked before skipping vertex reinit (Syed Shameerur Rahman via László Bodor)
bc6a961 is described below

commit bc6a961229a9e9ab8c67bb978e5da745de32b1e1
Author: Syed Shameerur Rahman <sr...@qubole.com>
AuthorDate: Tue May 12 10:30:18 2020 +0200

    TEZ-4173: isSetParallelismCalled should be checked before skipping vertex reinit (Syed Shameerur Rahman via László Bodor)
    
    Signed-off-by: Laszlo Bodor <bo...@gmail.com>
---
 .../apache/tez/dag/app/dag/impl/VertexImpl.java    |  6 ++--
 .../tez/dag/app/dag/impl/TestDAGRecovery.java      | 42 ++++++++++++++++++++--
 2 files changed, 44 insertions(+), 4 deletions(-)

diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index b67809e..85ae38d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -2826,10 +2826,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
     //        -  Why using VertexReconfigureDoneEvent
     //           -  VertexReconfigureDoneEvent represent the case that user use API reconfigureVertex
     //              VertexReconfigureDoneEvent will be logged
-    //   - TaskStartEvent is seen in that vertex
+    //   - TaskStartEvent is seen in that vertex or setVertexParallelism is called
     //   - All the parent vertices have skipped initializing stage while recovering
     if (recoveryData != null && recoveryData.shouldSkipInit()
-        && recoveryData.isVertexTasksStarted() && isVertexInitSkippedInParentVertices()) {
+        && (recoveryData.isVertexTasksStarted() ||
+        recoveryData.getVertexConfigurationDoneEvent().isSetParallelismCalled())
+        && isVertexInitSkippedInParentVertices()) {
       // Replace the original VertexManager with NoOpVertexManager if the reconfiguration is done in the last AM attempt
       VertexConfigurationDoneEvent reconfigureDoneEvent = recoveryData.getVertexConfigurationDoneEvent();
       if (LOG.isInfoEnabled()) {
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
index fcf6db8..95ea8a0 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
@@ -746,7 +746,7 @@ public class TestDAGRecovery {
         "vertex1", 0L, v1InitedTime, 
         v1NumTask, "", null, inputGeneratedTezEvents, null);
     VertexConfigurationDoneEvent v1ReconfigureDoneEvent = new VertexConfigurationDoneEvent(v1Id, 
-        0L, v1NumTask, null, null, null, true);
+        0L, v1NumTask, null, null, null, false);
     VertexRecoveryData vertexRecoveryData = new VertexRecoveryData(v1InitedEvent,
         v1ReconfigureDoneEvent, null, null, new HashMap<TezTaskID, TaskRecoveryData>(), false);
     doReturn(vertexRecoveryData).when(dagRecoveryData).getVertexRecoveryData(v1Id);
@@ -784,7 +784,7 @@ public class TestDAGRecovery {
         "vertex1", 0L, v1InitedTime,
         v1NumTask, "", null, inputGeneratedTezEvents, null);
     VertexConfigurationDoneEvent v1ReconfigureDoneEvent = new VertexConfigurationDoneEvent(v1Id,
-        0L, v1NumTask, null, null, null, true);
+        0L, v1NumTask, null, null, null, false);
     VertexStartedEvent v1StartedEvent = new VertexStartedEvent(v1Id, 0L, v1StartedTime);
     VertexRecoveryData vertexRecoveryData = new VertexRecoveryData(v1InitedEvent,
         v1ReconfigureDoneEvent, v1StartedEvent, null, new HashMap<TezTaskID, TaskRecoveryData>(), false);
@@ -807,6 +807,44 @@ public class TestDAGRecovery {
   /**
    * RecoveryEvents:
    *  DAG:  DAGInitedEvent -> DAGStartedEvent
+   *  V1:   VertexReconfigrationDoneEvent -> VertexInitializedEvent -> VertexStartedEvent -> setParallelismCalledFlag
+   *
+   * V1 skip initialization.
+   */
+  @Test(timeout=5000)
+  public void testVertexRecoverWithSetParallelismCalledFlag() {
+    initMockDAGRecoveryDataForVertex();
+    List<TezEvent> inputGeneratedTezEvents = new ArrayList<TezEvent>();
+    VertexInitializedEvent v1InitedEvent = new VertexInitializedEvent(v1Id,
+        "vertex1", 0L, v1InitedTime,
+        v1NumTask, "", null, inputGeneratedTezEvents, null);
+    VertexConfigurationDoneEvent v1ReconfigureDoneEvent = new VertexConfigurationDoneEvent(v1Id,
+        0L, v1NumTask, null, null, null, true);
+    VertexStartedEvent v1StartedEvent = new VertexStartedEvent(v1Id, 0L, v1StartedTime);
+    VertexRecoveryData vertexRecoveryData = new VertexRecoveryData(v1InitedEvent,
+        v1ReconfigureDoneEvent, v1StartedEvent, null, new HashMap<TezTaskID, TaskRecoveryData>(), false);
+    doReturn(vertexRecoveryData).when(dagRecoveryData).getVertexRecoveryData(v1Id);
+
+    DAGEventRecoverEvent recoveryEvent = new DAGEventRecoverEvent(dagId, dagRecoveryData);
+    dag.handle(recoveryEvent);
+    dispatcher.await();
+
+    VertexImpl v1 = (VertexImpl)dag.getVertex("vertex1");
+    VertexImpl v2 = (VertexImpl)dag.getVertex("vertex2");
+    VertexImpl v3 = (VertexImpl)dag.getVertex("vertex3");
+    assertEquals(DAGState.RUNNING, dag.getState());
+    // v1 skip initialization
+    assertEquals(VertexState.RUNNING, v1.getState());
+    assertEquals(v1InitedTime, v1.initedTime);
+    assertEquals(v1StartedTime, v1.startedTime);
+    assertEquals(v1NumTask, v1.getTotalTasks());
+    assertEquals(VertexState.RUNNING, v2.getState());
+    assertEquals(VertexState.RUNNING, v3.getState());
+  }
+
+  /**
+   * RecoveryEvents:
+   *  DAG:  DAGInitedEvent -> DAGStartedEvent
    *  V1:   VertexReconfigrationDoneEvent -> VertexInitializedEvent -> VertexStartedEvent -> VertexTaskStart
    *
    * V1 skip initialization.