You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2020/05/16 15:41:26 UTC
[tez] branch branch-0.9 updated: TEZ-4171. DAGImp::getDAGStatus
should try to report RUNNING state information correctly
This is an automated email from the ASF dual-hosted git repository.
jeagles pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/branch-0.9 by this push:
new c5119cb TEZ-4171. DAGImp::getDAGStatus should try to report RUNNING state information correctly
c5119cb is described below
commit c5119cbf3ec5fc6a1e8854cf028987b6f5202ea2
Author: Rajesh Balamohan <rb...@apache.org>
AuthorDate: Sat May 16 10:32:07 2020 -0500
TEZ-4171. DAGImp::getDAGStatus should try to report RUNNING state information correctly
Signed-off-by: Jonathan Eagles <je...@apache.org>
(cherry picked from commit 07c807b2d474eada6afc55f623f2b8613bb891a0)
---
.../java/org/apache/tez/dag/app/dag/impl/DAGImpl.java | 17 ++++++++++++++++-
.../org/apache/tez/dag/app/dag/impl/TestDAGImpl.java | 8 ++++++--
2 files changed, 22 insertions(+), 3 deletions(-)
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 8736bf9..a5723cb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -164,6 +164,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
private final Lock dagStatusLock = new ReentrantLock();
private final Condition dagStateChangedCondition = dagStatusLock.newCondition();
private final AtomicBoolean isFinalState = new AtomicBoolean(false);
+ private final AtomicBoolean runningStatusYetToBeConsumed = new AtomicBoolean(false);
private final Lock readLock;
private final Lock writeLock;
private final String dagName;
@@ -583,8 +584,18 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
implements OnStateChangedCallback<DAGState, DAGImpl> {
@Override
public void onStateChanged(DAGImpl dag, DAGState dagState) {
- if (dagState != DAGState.RUNNING) {
+ switch(dagState) {
+ case RUNNING:
+ dag.runningStatusYetToBeConsumed.set(true);
+ break;
+ case SUCCEEDED:
+ case FAILED:
+ case KILLED:
+ case ERROR:
dag.isFinalState.set(true);
+ break;
+ default:
+ break;
}
dag.dagStatusLock.lock();
try {
@@ -948,6 +959,10 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
if (isFinalState.get()) {
break;
}
+ if (runningStatusYetToBeConsumed.compareAndSet(true, false)) {
+ // No need to wait further, as state just got changed to RUNNING
+ break;
+ }
nanosLeft = dagStateChangedCondition.awaitNanos(timeoutNanos);
} catch (InterruptedException e) {
throw new TezException("Interrupted while waiting for dag to complete", e);
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index c0506de..2f2b3b8 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -1762,7 +1762,7 @@ public class TestDAGImpl {
DAGStatusBuilder dagStatus = dag.getDAGStatus(EnumSet.noneOf(StatusGetOpts.class), 2000l);
long dagStatusEndTime = System.currentTimeMillis();
long diff = dagStatusEndTime - dagStatusStartTime;
- Assert.assertTrue(diff > 1500 && diff < 2500);
+ Assert.assertTrue(diff >= 0 && diff < 2500);
Assert.assertEquals(DAGStatusBuilder.State.RUNNING, dagStatus.getState());
}
@@ -1805,6 +1805,9 @@ public class TestDAGImpl {
dispatcher.await();
Assert.assertEquals(DAGState.RUNNING, dag.getState());
Assert.assertEquals(5, dag.getSuccessfulVertices());
+ // Verify that dagStatus is running state
+ Assert.assertEquals(DAGStatus.State.RUNNING, dag.getDAGStatus(EnumSet.noneOf(StatusGetOpts.class),
+ 10000L).getState());
ReentrantLock lock = new ReentrantLock();
Condition startCondition = lock.newCondition();
@@ -1851,7 +1854,8 @@ public class TestDAGImpl {
long diff = statusCheckRunnable.dagStatusEndTime - statusCheckRunnable.dagStatusStartTime;
Assert.assertNotNull(statusCheckRunnable.dagStatus);
- Assert.assertTrue(diff > 1000 && diff < 3500);
+ Assert.assertTrue("Status: " + statusCheckRunnable.dagStatus.getState()
+ + ", Diff:" + diff, diff >= 0 && diff < 3500);
Assert.assertEquals(testState, statusCheckRunnable.dagStatus.getState());
t1.join();
}