You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2020/05/16 15:37:18 UTC

[tez] branch branch-0.9 updated: TEZ-4146: Register RUNNING state in DAG's state change callback (Rajesh Balamohan, reviewed by Gopal V)

This is an automated email from the ASF dual-hosted git repository.

jeagles pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new ad260d0  TEZ-4146: Register RUNNING state in DAG's state change callback (Rajesh Balamohan, reviewed by Gopal V)
ad260d0 is described below

commit ad260d0d980fa0753f4078684fb6e868884b2f8d
Author: Rajesh Balamohan <rb...@apache.org>
AuthorDate: Wed Apr 15 09:25:21 2020 +0530

    TEZ-4146: Register RUNNING state in DAG's state change callback (Rajesh Balamohan, reviewed by Gopal V)
    
    (cherry picked from commit f14baf95f35a66566652a17e592bb02b26beba7d)
---
 .../main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java   | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)

diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index a0d4db7..8736bf9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -162,7 +162,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   // TODO Recovery
   //private final List<AMInfo> amInfos;
   private final Lock dagStatusLock = new ReentrantLock();
-  private final Condition dagCompletionCondition = dagStatusLock.newCondition();
+  private final Condition dagStateChangedCondition = dagStatusLock.newCondition();
   private final AtomicBoolean isFinalState = new AtomicBoolean(false);
   private final Lock readLock;
   private final Lock writeLock;
@@ -567,6 +567,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
 
   private void augmentStateMachine() {
     stateMachine
+        .registerStateEnteredCallback(DAGState.RUNNING,
+            STATE_CHANGED_CALLBACK)
         .registerStateEnteredCallback(DAGState.SUCCEEDED,
             STATE_CHANGED_CALLBACK)
         .registerStateEnteredCallback(DAGState.FAILED,
@@ -581,10 +583,12 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       implements OnStateChangedCallback<DAGState, DAGImpl> {
     @Override
     public void onStateChanged(DAGImpl dag, DAGState dagState) {
-      dag.isFinalState.set(true);
+      if (dagState != DAGState.RUNNING) {
+        dag.isFinalState.set(true);
+      }
       dag.dagStatusLock.lock();
       try {
-        dag.dagCompletionCondition.signal();
+        dag.dagStateChangedCondition.signal();
       } finally {
         dag.dagStatusLock.unlock();
       }
@@ -944,7 +948,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         if (isFinalState.get()) {
           break;
         }
-        nanosLeft = dagCompletionCondition.awaitNanos(timeoutNanos);
+        nanosLeft = dagStateChangedCondition.awaitNanos(timeoutNanos);
       } catch (InterruptedException e) {
         throw new TezException("Interrupted while waiting for dag to complete", e);
       } finally {