You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zj...@apache.org on 2016/02/02 08:51:58 UTC
tez git commit: TEZ-2307. Possible wrong error message when
submitting new dag (zjffdu)
Repository: tez
Updated Branches:
refs/heads/master 72f561639 -> 235841f77
TEZ-2307. Possible wrong error message when submitting new dag (zjffdu)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/235841f7
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/235841f7
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/235841f7
Branch: refs/heads/master
Commit: 235841f77ebf88994c8d7af189cf1000aedbd69f
Parents: 72f5616
Author: Jeff Zhang <zj...@apache.org>
Authored: Tue Feb 2 13:21:45 2016 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Tue Feb 2 15:51:20 2016 +0800
----------------------------------------------------------------------
CHANGES.txt | 2 ++
.../org/apache/tez/dag/app/DAGAppMaster.java | 33 ++++++++++++++------
2 files changed, 26 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/235841f7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6bff146..c4c04e8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.8.3: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2307. Possible wrong error message when submitting new dag
TEZ-2974. Tez tools: TFileRecordReader in tez-tools should support reading >2 GB tfiles.
TEZ-3081. Update tez website for trademarks feedback.
TEZ-3076. Reduce merge memory overhead to support large number of in-memory mapoutputs
@@ -325,6 +326,7 @@ INCOMPATIBLE CHANGES
TEZ-2949. Allow duplicate dag names within session for Tez.
ALL CHANGES
+ TEZ-2307. Possible wrong error message when submitting new dag.
TEZ-3076. Reduce merge memory overhead to support large number of in-memory mapoutputs
TEZ-3066. TaskAttemptFinishedEvent ConcurrentModificationException in recovery or history logging services.
TEZ-3036. Tez AM can hang on startup with no indication of error
http://git-wip-us.apache.org/repos/asf/tez/blob/235841f7/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index c16bdb9..579d23f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -280,6 +280,7 @@ public class DAGAppMaster extends AbstractService {
private final UserGroupInformation appMasterUgi;
private AtomicBoolean sessionStopped = new AtomicBoolean(false);
+ private final Object idleStateLock = new Object();
private long sessionTimeoutInterval;
private long lastDAGCompletionTime;
private Timer dagSubmissionTimer;
@@ -811,7 +812,6 @@ public class DAGAppMaster extends AbstractService {
// Leaving the taskSchedulerEventHandler here for now. Doesn't generate new events.
// However, eventually it needs to be moved out.
this.taskSchedulerManager.dagCompleted();
- state = DAGAppMasterState.IDLE;
} else {
LOG.info("Session shutting down now.");
this.taskSchedulerManager.setShouldUnregisterFlag();
@@ -851,6 +851,10 @@ public class DAGAppMaster extends AbstractService {
TezDAGID.clearCache();
LOG.info("Completed cleanup for DAG: name=" + cleanupEvent.getDag().getName() + ", with id=" +
cleanupEvent.getDag().getID());
+ synchronized (idleStateLock) {
+ state = DAGAppMasterState.IDLE;
+ idleStateLock.notify();
+ }
break;
case NEW_DAG_SUBMITTED:
// Inform sub-components that a new DAG has been submitted.
@@ -1331,21 +1335,33 @@ public class DAGAppMaster extends AbstractService {
throw new SessionNotRunning("AM unable to accept new DAG submissions."
+ " In the process of shutting down");
}
+
+ // dag is in cleanup when dag state is completed but AM state is still RUNNING
+ synchronized (idleStateLock) {
+ while (currentDAG != null && currentDAG.isComplete() && state == DAGAppMasterState.RUNNING) {
+ try {
+ LOG.info("wait for previous dag cleanup");
+ idleStateLock.wait();
+ } catch (InterruptedException e) {
+ throw new TezException(e);
+ }
+ }
+ }
+
synchronized (this) {
if (this.versionMismatch) {
throw new TezException("Unable to accept DAG submissions as the ApplicationMaster is"
+ " incompatible with the client. " + versionMismatchDiagnostics);
}
- if (currentDAG != null
- && !state.equals(DAGAppMasterState.IDLE)) {
- throw new TezException("App master already running a DAG");
- }
if (state.equals(DAGAppMasterState.ERROR)
- || sessionStopped.get()) {
+ || sessionStopped.get()) {
throw new SessionNotRunning("AM unable to accept new DAG submissions."
- + " In the process of shutting down");
+ + " In the process of shutting down");
+ }
+ if (currentDAG != null
+ && !currentDAG.isComplete()) {
+ throw new TezException("App master already running a DAG");
}
-
// RPC server runs in the context of the job user as it was started in
// the job user's UGI context
LOG.info("Starting DAG submitted via RPC: " + dagPlan.getName());
@@ -2445,7 +2461,6 @@ public class DAGAppMaster extends AbstractService {
}
startDAGExecution(newDAG, lrDiff);
-
// set state after curDag is set
this.state = DAGAppMasterState.RUNNING;
}