You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2015/06/12 07:55:52 UTC
tez git commit: TEZ-2548. TezClient submitDAG can hang if the AM is
in the process of shutting down. (hitesh)
Repository: tez
Updated Branches:
refs/heads/master 64ca681a2 -> bb51e4ed6
TEZ-2548. TezClient submitDAG can hang if the AM is in the process of shutting down. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/bb51e4ed
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/bb51e4ed
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/bb51e4ed
Branch: refs/heads/master
Commit: bb51e4ed65f704f1a03b8420c163d2ee0dffdb81
Parents: 64ca681
Author: Hitesh Shah <hi...@apache.org>
Authored: Thu Jun 11 22:55:29 2015 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Thu Jun 11 22:55:29 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../org/apache/tez/dag/app/DAGAppMaster.java | 161 ++++++++++---------
2 files changed, 89 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/bb51e4ed/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d42dda8..97b2e9e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,7 @@ INCOMPATIBLE CHANGES
TEZ-2468. Change the minimum Java version to Java 7.
ALL CHANGES:
+ TEZ-2548. TezClient submitDAG can hang if the AM is in the process of shutting down.
TEZ-2473. Consider using RawLocalFileSystem in MapOutput.createDiskMapOutput.
TEZ-2538. ADDITIONAL_SPILL_COUNT wrongly populated for DefaultSorter with multiple partitions.
TEZ-2489. Disable warn log for Timeline ACL error when tez.allow.disabled.timeline-domains set to true.
@@ -36,6 +37,7 @@ Release 0.7.1: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2548. TezClient submitDAG can hang if the AM is in the process of shutting down.
TEZ-2547. Tez UI: Download Data fails on secure, cross-origin clusters
TEZ-1961. Remove misleading exception "No running dag" from AM logs.
TEZ-2546. Tez UI: Fetch hive query text from timeline if dagInfo is not set.
@@ -245,6 +247,7 @@ Release 0.6.2: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2548. TezClient submitDAG can hang if the AM is in the process of shutting down.
TEZ-2534. Error handling summary event when shutting down AM.
TEZ-2511. Add exitCode to diagnostics when container fails.
TEZ-2489. Disable warn log for Timeline ACL error when tez.allow.disabled.timeline-domains set to true.
http://git-wip-us.apache.org/repos/asf/tez/blob/bb51e4ed/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 4c6b285..7ad8f41 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -59,6 +59,7 @@ import java.util.regex.Pattern;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.Options;
+import org.apache.tez.dag.api.SessionNotRunning;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDagCleanup;
import org.apache.tez.dag.history.events.DAGRecoveredEvent;
import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -1172,50 +1173,58 @@ public class DAGAppMaster extends AbstractService {
+ oldState + " new state: " + state);
}
- public synchronized void shutdownTezAM() {
+ public void shutdownTezAM() {
sessionStopped.set(true);
- this.taskSchedulerEventHandler.setShouldUnregisterFlag();
- if (currentDAG != null
- && !currentDAG.isComplete()) {
- //send a DAG_KILL message
- LOG.info("Sending a kill event to the current DAG"
- + ", dagId=" + currentDAG.getID());
- sendEvent(new DAGEvent(currentDAG.getID(), DAGEventType.DAG_KILL));
- } else {
- LOG.info("No current running DAG, shutting down the AM");
- if (isSession && !state.equals(DAGAppMasterState.ERROR)) {
- state = DAGAppMasterState.SUCCEEDED;
+ synchronized (this) {
+ this.taskSchedulerEventHandler.setShouldUnregisterFlag();
+ if (currentDAG != null
+ && !currentDAG.isComplete()) {
+ //send a DAG_KILL message
+ LOG.info("Sending a kill event to the current DAG"
+ + ", dagId=" + currentDAG.getID());
+ sendEvent(new DAGEvent(currentDAG.getID(), DAGEventType.DAG_KILL));
+ } else {
+ LOG.info("No current running DAG, shutting down the AM");
+ if (isSession && !state.equals(DAGAppMasterState.ERROR)) {
+ state = DAGAppMasterState.SUCCEEDED;
+ }
+ shutdownHandler.shutdown();
}
- shutdownHandler.shutdown();
}
}
- public synchronized String submitDAGToAppMaster(DAGPlan dagPlan,
+ public String submitDAGToAppMaster(DAGPlan dagPlan,
Map<String, LocalResource> additionalResources) throws TezException {
- if (this.versionMismatch) {
- throw new TezException("Unable to accept DAG submissions as the ApplicationMaster is"
- + " incompatible with the client. " + versionMismatchDiagnostics);
- }
- if(currentDAG != null
- && !state.equals(DAGAppMasterState.IDLE)) {
- throw new TezException("App master already running a DAG");
- }
- if (state.equals(DAGAppMasterState.ERROR)
- || sessionStopped.get()) {
- throw new TezException("AM unable to accept new DAG submissions."
+ if (sessionStopped.get()) {
+ throw new SessionNotRunning("AM unable to accept new DAG submissions."
+ " In the process of shutting down");
}
+ synchronized (this) {
+ if (this.versionMismatch) {
+ throw new TezException("Unable to accept DAG submissions as the ApplicationMaster is"
+ + " incompatible with the client. " + versionMismatchDiagnostics);
+ }
+ if (currentDAG != null
+ && !state.equals(DAGAppMasterState.IDLE)) {
+ throw new TezException("App master already running a DAG");
+ }
+ if (state.equals(DAGAppMasterState.ERROR)
+ || sessionStopped.get()) {
+ throw new SessionNotRunning("AM unable to accept new DAG submissions."
+ + " In the process of shutting down");
+ }
- // RPC server runs in the context of the job user as it was started in
- // the job user's UGI context
- LOG.info("Starting DAG submitted via RPC: " + dagPlan.getName());
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Invoked with additional local resources: " + additionalResources);
+ // RPC server runs in the context of the job user as it was started in
+ // the job user's UGI context
+ LOG.info("Starting DAG submitted via RPC: " + dagPlan.getName());
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Invoked with additional local resources: " + additionalResources);
+ }
+ submittedDAGs.incrementAndGet();
+ startDAG(dagPlan, additionalResources);
+ return currentDAG.getID().toString();
}
- submittedDAGs.incrementAndGet();
- startDAG(dagPlan, additionalResources);
- return currentDAG.getID().toString();
}
@SuppressWarnings("unchecked")
@@ -1821,56 +1830,58 @@ public class DAGAppMaster extends AbstractService {
@Override
- public synchronized void serviceStop() throws Exception {
+ public void serviceStop() throws Exception {
if (isSession) {
sessionStopped.set(true);
}
- if (this.dagSubmissionTimer != null) {
- this.dagSubmissionTimer.cancel();
- }
- stopServices();
+ synchronized (this) {
+ if (this.dagSubmissionTimer != null) {
+ this.dagSubmissionTimer.cancel();
+ }
+ stopServices();
- // Given pre-emption, we should delete tez scratch dir only if unregister is
- // successful
- boolean deleteTezScratchData = this.amConf.getBoolean(
- TezConfiguration.TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE,
- TezConfiguration.TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE_DEFAULT);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Checking whether tez scratch data dir should be deleted, deleteTezScratchData="
- + deleteTezScratchData);
- }
- if (deleteTezScratchData && this.taskSchedulerEventHandler != null
- && this.taskSchedulerEventHandler.hasUnregistered()) {
- // Delete tez scratch data dir
- if (this.tezSystemStagingDir != null) {
- try {
- this.appMasterUgi.doAs(new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws Exception {
- FileSystem fs = tezSystemStagingDir.getFileSystem(amConf);
- boolean deletedStagingDir = fs.delete(tezSystemStagingDir, true);
- if (!deletedStagingDir) {
- LOG.warn("Failed to delete tez scratch data dir, path="
- + tezSystemStagingDir);
- } else {
- LOG.info("Completed deletion of tez scratch data dir, path="
- + tezSystemStagingDir);
+ // Given pre-emption, we should delete tez scratch dir only if unregister is
+ // successful
+ boolean deleteTezScratchData = this.amConf.getBoolean(
+ TezConfiguration.TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE,
+ TezConfiguration.TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE_DEFAULT);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Checking whether tez scratch data dir should be deleted, deleteTezScratchData="
+ + deleteTezScratchData);
+ }
+ if (deleteTezScratchData && this.taskSchedulerEventHandler != null
+ && this.taskSchedulerEventHandler.hasUnregistered()) {
+ // Delete tez scratch data dir
+ if (this.tezSystemStagingDir != null) {
+ try {
+ this.appMasterUgi.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ FileSystem fs = tezSystemStagingDir.getFileSystem(amConf);
+ boolean deletedStagingDir = fs.delete(tezSystemStagingDir, true);
+ if (!deletedStagingDir) {
+ LOG.warn("Failed to delete tez scratch data dir, path="
+ + tezSystemStagingDir);
+ } else {
+ LOG.info("Completed deletion of tez scratch data dir, path="
+ + tezSystemStagingDir);
+ }
+ return null;
}
- return null;
- }
- });
- } catch (IOException e) {
- // Best effort to delete tez scratch data dir
- LOG.warn("Failed to delete tez scratch data dir", e);
+ });
+ } catch (IOException e) {
+ // Best effort to delete tez scratch data dir
+ LOG.warn("Failed to delete tez scratch data dir", e);
+ }
}
}
- }
- if (execService != null) {
- execService.shutdownNow();
- }
+ if (execService != null) {
+ execService.shutdownNow();
+ }
- super.serviceStop();
+ super.serviceStop();
+ }
}
private class DagEventDispatcher implements EventHandler<DAGEvent> {