You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2020/05/16 15:41:26 UTC

[tez] branch branch-0.9 updated: TEZ-4171. DAGImp::getDAGStatus should try to report RUNNING state information correctly

This is an automated email from the ASF dual-hosted git repository.

jeagles pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new c5119cb  TEZ-4171. DAGImp::getDAGStatus should try to report RUNNING state information correctly
c5119cb is described below

commit c5119cbf3ec5fc6a1e8854cf028987b6f5202ea2
Author: Rajesh Balamohan <rb...@apache.org>
AuthorDate: Sat May 16 10:32:07 2020 -0500

    TEZ-4171. DAGImp::getDAGStatus should try to report RUNNING state information correctly
    
    Signed-off-by: Jonathan Eagles <je...@apache.org>
    (cherry picked from commit 07c807b2d474eada6afc55f623f2b8613bb891a0)
---
 .../java/org/apache/tez/dag/app/dag/impl/DAGImpl.java   | 17 ++++++++++++++++-
 .../org/apache/tez/dag/app/dag/impl/TestDAGImpl.java    |  8 ++++++--
 2 files changed, 22 insertions(+), 3 deletions(-)

diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 8736bf9..a5723cb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -164,6 +164,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   private final Lock dagStatusLock = new ReentrantLock();
   private final Condition dagStateChangedCondition = dagStatusLock.newCondition();
   private final AtomicBoolean isFinalState = new AtomicBoolean(false);
+  private final AtomicBoolean runningStatusYetToBeConsumed = new AtomicBoolean(false);
   private final Lock readLock;
   private final Lock writeLock;
   private final String dagName;
@@ -583,8 +584,18 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       implements OnStateChangedCallback<DAGState, DAGImpl> {
     @Override
     public void onStateChanged(DAGImpl dag, DAGState dagState) {
-      if (dagState != DAGState.RUNNING) {
+      switch(dagState) {
+      case RUNNING:
+        dag.runningStatusYetToBeConsumed.set(true);
+        break;
+      case SUCCEEDED:
+      case FAILED:
+      case KILLED:
+      case ERROR:
         dag.isFinalState.set(true);
+        break;
+      default:
+        break;
       }
       dag.dagStatusLock.lock();
       try {
@@ -948,6 +959,10 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         if (isFinalState.get()) {
           break;
         }
+        if (runningStatusYetToBeConsumed.compareAndSet(true, false)) {
+          // No need to wait further, as state just got changed to RUNNING
+          break;
+        }
         nanosLeft = dagStateChangedCondition.awaitNanos(timeoutNanos);
       } catch (InterruptedException e) {
         throw new TezException("Interrupted while waiting for dag to complete", e);
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index c0506de..2f2b3b8 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -1762,7 +1762,7 @@ public class TestDAGImpl {
     DAGStatusBuilder dagStatus = dag.getDAGStatus(EnumSet.noneOf(StatusGetOpts.class), 2000l);
     long dagStatusEndTime = System.currentTimeMillis();
     long diff = dagStatusEndTime - dagStatusStartTime;
-    Assert.assertTrue(diff > 1500 && diff < 2500);
+    Assert.assertTrue(diff >= 0 && diff < 2500);
     Assert.assertEquals(DAGStatusBuilder.State.RUNNING, dagStatus.getState());
   }
 
@@ -1805,6 +1805,9 @@ public class TestDAGImpl {
     dispatcher.await();
     Assert.assertEquals(DAGState.RUNNING, dag.getState());
     Assert.assertEquals(5, dag.getSuccessfulVertices());
+    // Verify that dagStatus is running state
+    Assert.assertEquals(DAGStatus.State.RUNNING, dag.getDAGStatus(EnumSet.noneOf(StatusGetOpts.class),
+        10000L).getState());
 
     ReentrantLock lock = new ReentrantLock();
     Condition startCondition = lock.newCondition();
@@ -1851,7 +1854,8 @@ public class TestDAGImpl {
 
     long diff = statusCheckRunnable.dagStatusEndTime - statusCheckRunnable.dagStatusStartTime;
     Assert.assertNotNull(statusCheckRunnable.dagStatus);
-    Assert.assertTrue(diff > 1000 && diff < 3500);
+    Assert.assertTrue("Status: " + statusCheckRunnable.dagStatus.getState()
+            + ", Diff:" + diff, diff >= 0 && diff < 3500);
     Assert.assertEquals(testState, statusCheckRunnable.dagStatus.getState());
     t1.join();
   }