You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2015/06/12 07:55:52 UTC

tez git commit: TEZ-2548. TezClient submitDAG can hang if the AM is in the process of shutting down. (hitesh)

Repository: tez
Updated Branches:
  refs/heads/master 64ca681a2 -> bb51e4ed6


TEZ-2548. TezClient submitDAG can hang if the AM is in the process of shutting down. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/bb51e4ed
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/bb51e4ed
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/bb51e4ed

Branch: refs/heads/master
Commit: bb51e4ed65f704f1a03b8420c163d2ee0dffdb81
Parents: 64ca681
Author: Hitesh Shah <hi...@apache.org>
Authored: Thu Jun 11 22:55:29 2015 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Thu Jun 11 22:55:29 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../org/apache/tez/dag/app/DAGAppMaster.java    | 161 ++++++++++---------
 2 files changed, 89 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/bb51e4ed/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d42dda8..97b2e9e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,7 @@ INCOMPATIBLE CHANGES
   TEZ-2468. Change the minimum Java version to Java 7.
 
 ALL CHANGES:
+  TEZ-2548. TezClient submitDAG can hang if the AM is in the process of shutting down.
   TEZ-2473. Consider using RawLocalFileSystem in MapOutput.createDiskMapOutput.
   TEZ-2538. ADDITIONAL_SPILL_COUNT wrongly populated for DefaultSorter with multiple partitions.
   TEZ-2489. Disable warn log for Timeline ACL error when tez.allow.disabled.timeline-domains set to true.
@@ -36,6 +37,7 @@ Release 0.7.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2548. TezClient submitDAG can hang if the AM is in the process of shutting down.
   TEZ-2547. Tez UI: Download Data fails on secure, cross-origin clusters
   TEZ-1961. Remove misleading exception "No running dag" from AM logs.
   TEZ-2546. Tez UI: Fetch hive query text from timeline if dagInfo is not set.
@@ -245,6 +247,7 @@ Release 0.6.2: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2548. TezClient submitDAG can hang if the AM is in the process of shutting down.
   TEZ-2534. Error handling summary event when shutting down AM.
   TEZ-2511. Add exitCode to diagnostics when container fails.
   TEZ-2489. Disable warn log for Timeline ACL error when tez.allow.disabled.timeline-domains set to true.

http://git-wip-us.apache.org/repos/asf/tez/blob/bb51e4ed/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 4c6b285..7ad8f41 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -59,6 +59,7 @@ import java.util.regex.Pattern;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.Options;
+import org.apache.tez.dag.api.SessionNotRunning;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDagCleanup;
 import org.apache.tez.dag.history.events.DAGRecoveredEvent;
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -1172,50 +1173,58 @@ public class DAGAppMaster extends AbstractService {
         + oldState + " new state: " + state);
   }
 
-  public synchronized void shutdownTezAM() {
+  public void shutdownTezAM() {
     sessionStopped.set(true);
-    this.taskSchedulerEventHandler.setShouldUnregisterFlag();
-    if (currentDAG != null
-        && !currentDAG.isComplete()) {
-      //send a DAG_KILL message
-      LOG.info("Sending a kill event to the current DAG"
-          + ", dagId=" + currentDAG.getID());
-      sendEvent(new DAGEvent(currentDAG.getID(), DAGEventType.DAG_KILL));
-    } else {
-      LOG.info("No current running DAG, shutting down the AM");
-      if (isSession && !state.equals(DAGAppMasterState.ERROR)) {
-        state = DAGAppMasterState.SUCCEEDED;
+    synchronized (this) {
+      this.taskSchedulerEventHandler.setShouldUnregisterFlag();
+      if (currentDAG != null
+          && !currentDAG.isComplete()) {
+        //send a DAG_KILL message
+        LOG.info("Sending a kill event to the current DAG"
+            + ", dagId=" + currentDAG.getID());
+        sendEvent(new DAGEvent(currentDAG.getID(), DAGEventType.DAG_KILL));
+      } else {
+        LOG.info("No current running DAG, shutting down the AM");
+        if (isSession && !state.equals(DAGAppMasterState.ERROR)) {
+          state = DAGAppMasterState.SUCCEEDED;
+        }
+        shutdownHandler.shutdown();
       }
-      shutdownHandler.shutdown();
     }
   }
 
-  public synchronized String submitDAGToAppMaster(DAGPlan dagPlan,
+  public String submitDAGToAppMaster(DAGPlan dagPlan,
       Map<String, LocalResource> additionalResources) throws TezException {
-    if (this.versionMismatch) {
-      throw new TezException("Unable to accept DAG submissions as the ApplicationMaster is"
-          + " incompatible with the client. " + versionMismatchDiagnostics);
-    }
-    if(currentDAG != null
-        && !state.equals(DAGAppMasterState.IDLE)) {
-      throw new TezException("App master already running a DAG");
-    }
-    if (state.equals(DAGAppMasterState.ERROR)
-        || sessionStopped.get()) {
-      throw new TezException("AM unable to accept new DAG submissions."
+    if (sessionStopped.get()) {
+      throw new SessionNotRunning("AM unable to accept new DAG submissions."
           + " In the process of shutting down");
     }
+    synchronized (this) {
+      if (this.versionMismatch) {
+        throw new TezException("Unable to accept DAG submissions as the ApplicationMaster is"
+            + " incompatible with the client. " + versionMismatchDiagnostics);
+      }
+      if (currentDAG != null
+          && !state.equals(DAGAppMasterState.IDLE)) {
+        throw new TezException("App master already running a DAG");
+      }
+      if (state.equals(DAGAppMasterState.ERROR)
+          || sessionStopped.get()) {
+        throw new SessionNotRunning("AM unable to accept new DAG submissions."
+            + " In the process of shutting down");
+      }
 
-    // RPC server runs in the context of the job user as it was started in
-    // the job user's UGI context
-    LOG.info("Starting DAG submitted via RPC: " + dagPlan.getName());
-    
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Invoked with additional local resources: " + additionalResources);
+      // RPC server runs in the context of the job user as it was started in
+      // the job user's UGI context
+      LOG.info("Starting DAG submitted via RPC: " + dagPlan.getName());
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Invoked with additional local resources: " + additionalResources);
+      }
+      submittedDAGs.incrementAndGet();
+      startDAG(dagPlan, additionalResources);
+      return currentDAG.getID().toString();
     }
-    submittedDAGs.incrementAndGet();
-    startDAG(dagPlan, additionalResources);
-    return currentDAG.getID().toString();
   }
 
   @SuppressWarnings("unchecked")
@@ -1821,56 +1830,58 @@ public class DAGAppMaster extends AbstractService {
 
 
   @Override
-  public synchronized void serviceStop() throws Exception {
+  public void serviceStop() throws Exception {
     if (isSession) {
       sessionStopped.set(true);
     }
-    if (this.dagSubmissionTimer != null) {
-      this.dagSubmissionTimer.cancel();
-    }
-    stopServices();
+    synchronized (this) {
+      if (this.dagSubmissionTimer != null) {
+        this.dagSubmissionTimer.cancel();
+      }
+      stopServices();
 
-    // Given pre-emption, we should delete tez scratch dir only if unregister is
-    // successful
-    boolean deleteTezScratchData = this.amConf.getBoolean(
-        TezConfiguration.TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE,
-        TezConfiguration.TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE_DEFAULT);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Checking whether tez scratch data dir should be deleted, deleteTezScratchData="
-          + deleteTezScratchData);
-    }
-    if (deleteTezScratchData && this.taskSchedulerEventHandler != null
-        && this.taskSchedulerEventHandler.hasUnregistered()) {
-      // Delete tez scratch data dir
-      if (this.tezSystemStagingDir != null) {
-        try {
-          this.appMasterUgi.doAs(new PrivilegedExceptionAction<Void>() {
-            @Override
-            public Void run() throws Exception {
-              FileSystem fs = tezSystemStagingDir.getFileSystem(amConf);
-              boolean deletedStagingDir = fs.delete(tezSystemStagingDir, true);
-              if (!deletedStagingDir) {
-                LOG.warn("Failed to delete tez scratch data dir, path="
-                + tezSystemStagingDir);
-              } else {
-                LOG.info("Completed deletion of tez scratch data dir, path="
-                  + tezSystemStagingDir);
+      // Given pre-emption, we should delete tez scratch dir only if unregister is
+      // successful
+      boolean deleteTezScratchData = this.amConf.getBoolean(
+          TezConfiguration.TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE,
+          TezConfiguration.TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE_DEFAULT);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Checking whether tez scratch data dir should be deleted, deleteTezScratchData="
+            + deleteTezScratchData);
+      }
+      if (deleteTezScratchData && this.taskSchedulerEventHandler != null
+          && this.taskSchedulerEventHandler.hasUnregistered()) {
+        // Delete tez scratch data dir
+        if (this.tezSystemStagingDir != null) {
+          try {
+            this.appMasterUgi.doAs(new PrivilegedExceptionAction<Void>() {
+              @Override
+              public Void run() throws Exception {
+                FileSystem fs = tezSystemStagingDir.getFileSystem(amConf);
+                boolean deletedStagingDir = fs.delete(tezSystemStagingDir, true);
+                if (!deletedStagingDir) {
+                  LOG.warn("Failed to delete tez scratch data dir, path="
+                      + tezSystemStagingDir);
+                } else {
+                  LOG.info("Completed deletion of tez scratch data dir, path="
+                      + tezSystemStagingDir);
+                }
+                return null;
               }
-              return null;
-            }
-          });
-        } catch (IOException e) {
-          // Best effort to delete tez scratch data dir
-          LOG.warn("Failed to delete tez scratch data dir", e);
+            });
+          } catch (IOException e) {
+            // Best effort to delete tez scratch data dir
+            LOG.warn("Failed to delete tez scratch data dir", e);
+          }
         }
       }
-    }
 
-    if (execService != null) {
-      execService.shutdownNow();
-    }
+      if (execService != null) {
+        execService.shutdownNow();
+      }
 
-    super.serviceStop();
+      super.serviceStop();
+    }
   }
 
   private class DagEventDispatcher implements EventHandler<DAGEvent> {