You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2020/05/16 15:37:18 UTC
[tez] branch branch-0.9 updated: TEZ-4146: Register RUNNING state
in DAG's state change callback (Rajesh Balamohan, reviewed by Gopal V)
This is an automated email from the ASF dual-hosted git repository.
jeagles pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/branch-0.9 by this push:
new ad260d0 TEZ-4146: Register RUNNING state in DAG's state change callback (Rajesh Balamohan, reviewed by Gopal V)
ad260d0 is described below
commit ad260d0d980fa0753f4078684fb6e868884b2f8d
Author: Rajesh Balamohan <rb...@apache.org>
AuthorDate: Wed Apr 15 09:25:21 2020 +0530
TEZ-4146: Register RUNNING state in DAG's state change callback (Rajesh Balamohan, reviewed by Gopal V)
(cherry picked from commit f14baf95f35a66566652a17e592bb02b26beba7d)
---
.../main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java | 12 ++++++++----
1 file changed, 8 insertions(+), 4 deletions(-)
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index a0d4db7..8736bf9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -162,7 +162,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
// TODO Recovery
//private final List<AMInfo> amInfos;
private final Lock dagStatusLock = new ReentrantLock();
- private final Condition dagCompletionCondition = dagStatusLock.newCondition();
+ private final Condition dagStateChangedCondition = dagStatusLock.newCondition();
private final AtomicBoolean isFinalState = new AtomicBoolean(false);
private final Lock readLock;
private final Lock writeLock;
@@ -567,6 +567,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
private void augmentStateMachine() {
stateMachine
+ .registerStateEnteredCallback(DAGState.RUNNING,
+ STATE_CHANGED_CALLBACK)
.registerStateEnteredCallback(DAGState.SUCCEEDED,
STATE_CHANGED_CALLBACK)
.registerStateEnteredCallback(DAGState.FAILED,
@@ -581,10 +583,12 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
implements OnStateChangedCallback<DAGState, DAGImpl> {
@Override
public void onStateChanged(DAGImpl dag, DAGState dagState) {
- dag.isFinalState.set(true);
+ if (dagState != DAGState.RUNNING) {
+ dag.isFinalState.set(true);
+ }
dag.dagStatusLock.lock();
try {
- dag.dagCompletionCondition.signal();
+ dag.dagStateChangedCondition.signal();
} finally {
dag.dagStatusLock.unlock();
}
@@ -944,7 +948,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
if (isFinalState.get()) {
break;
}
- nanosLeft = dagCompletionCondition.awaitNanos(timeoutNanos);
+ nanosLeft = dagStateChangedCondition.awaitNanos(timeoutNanos);
} catch (InterruptedException e) {
throw new TezException("Interrupted while waiting for dag to complete", e);
} finally {