You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by sr...@apache.org on 2016/02/18 10:58:04 UTC

[10/24] tez git commit: TEZ-2307. Possible wrong error message when submitting new dag (zjffdu)

TEZ-2307. Possible wrong error message when submitting new dag (zjffdu)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/235841f7
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/235841f7
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/235841f7

Branch: refs/heads/TEZ-2980
Commit: 235841f77ebf88994c8d7af189cf1000aedbd69f
Parents: 72f5616
Author: Jeff Zhang <zj...@apache.org>
Authored: Tue Feb 2 13:21:45 2016 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Tue Feb 2 15:51:20 2016 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 .../org/apache/tez/dag/app/DAGAppMaster.java    | 33 ++++++++++++++------
 2 files changed, 26 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/235841f7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6bff146..c4c04e8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.8.3: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2307. Possible wrong error message when submitting new dag
   TEZ-2974. Tez tools: TFileRecordReader in tez-tools should support reading >2 GB tfiles.
   TEZ-3081. Update tez website for trademarks feedback.
   TEZ-3076. Reduce merge memory overhead to support large number of in-memory mapoutputs
@@ -325,6 +326,7 @@ INCOMPATIBLE CHANGES
   TEZ-2949. Allow duplicate dag names within session for Tez.
 
 ALL CHANGES
+  TEZ-2307. Possible wrong error message when submitting new dag.
   TEZ-3076. Reduce merge memory overhead to support large number of in-memory mapoutputs
   TEZ-3066. TaskAttemptFinishedEvent ConcurrentModificationException in recovery or history logging services.
   TEZ-3036. Tez AM can hang on startup with no indication of error

http://git-wip-us.apache.org/repos/asf/tez/blob/235841f7/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index c16bdb9..579d23f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -280,6 +280,7 @@ public class DAGAppMaster extends AbstractService {
   private final UserGroupInformation appMasterUgi;
 
   private AtomicBoolean sessionStopped = new AtomicBoolean(false);
+  private final Object idleStateLock = new Object();
   private long sessionTimeoutInterval;
   private long lastDAGCompletionTime;
   private Timer dagSubmissionTimer;
@@ -811,7 +812,6 @@ public class DAGAppMaster extends AbstractService {
             // Leaving the taskSchedulerEventHandler here for now. Doesn't generate new events.
             // However, eventually it needs to be moved out.
             this.taskSchedulerManager.dagCompleted();
-            state = DAGAppMasterState.IDLE;
           } else {
             LOG.info("Session shutting down now.");
             this.taskSchedulerManager.setShouldUnregisterFlag();
@@ -851,6 +851,10 @@ public class DAGAppMaster extends AbstractService {
       TezDAGID.clearCache();
       LOG.info("Completed cleanup for DAG: name=" + cleanupEvent.getDag().getName() + ", with id=" +
           cleanupEvent.getDag().getID());
+      synchronized (idleStateLock) {
+        state = DAGAppMasterState.IDLE;
+        idleStateLock.notify();
+      }
       break;
     case NEW_DAG_SUBMITTED:
       // Inform sub-components that a new DAG has been submitted.
@@ -1331,21 +1335,33 @@ public class DAGAppMaster extends AbstractService {
       throw new SessionNotRunning("AM unable to accept new DAG submissions."
           + " In the process of shutting down");
     }
+
+    // dag is in cleanup when dag state is completed but AM state is still RUNNING
+    synchronized (idleStateLock) {
+      while (currentDAG != null && currentDAG.isComplete() && state == DAGAppMasterState.RUNNING) {
+        try {
+          LOG.info("wait for previous dag cleanup");
+          idleStateLock.wait();
+        } catch (InterruptedException e) {
+          throw new TezException(e);
+        }
+      }
+    }
+
     synchronized (this) {
       if (this.versionMismatch) {
         throw new TezException("Unable to accept DAG submissions as the ApplicationMaster is"
             + " incompatible with the client. " + versionMismatchDiagnostics);
       }
-      if (currentDAG != null
-          && !state.equals(DAGAppMasterState.IDLE)) {
-        throw new TezException("App master already running a DAG");
-      }
       if (state.equals(DAGAppMasterState.ERROR)
-          || sessionStopped.get()) {
+              || sessionStopped.get()) {
         throw new SessionNotRunning("AM unable to accept new DAG submissions."
-            + " In the process of shutting down");
+                + " In the process of shutting down");
+      }
+      if (currentDAG != null
+          && !currentDAG.isComplete()) {
+        throw new TezException("App master already running a DAG");
       }
-
       // RPC server runs in the context of the job user as it was started in
       // the job user's UGI context
       LOG.info("Starting DAG submitted via RPC: " + dagPlan.getName());
@@ -2445,7 +2461,6 @@ public class DAGAppMaster extends AbstractService {
     }
 
     startDAGExecution(newDAG, lrDiff);
-
     // set state after curDag is set
     this.state = DAGAppMasterState.RUNNING;
   }