You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/12/03 02:05:15 UTC
git commit: TEZ-658. YarnTezDagChild exits with a non 0 exit code
even if a task succeeds. (sseth)
Updated Branches:
refs/heads/master 74207c4c7 -> 7e3f4c2cd
TEZ-658. YarnTezDagChild exits with a non 0 exit code even if a task
succeeds. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/7e3f4c2c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/7e3f4c2c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/7e3f4c2c
Branch: refs/heads/master
Commit: 7e3f4c2cdf7976015bff9811752271da1da07839
Parents: 74207c4
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Dec 2 17:04:57 2013 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Dec 2 17:04:57 2013 -0800
----------------------------------------------------------------------
.../apache/hadoop/mapred/YarnTezDagChild.java | 44 +++++++++++++++++---
1 file changed, 39 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7e3f4c2c/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index 6e77d37..d2ff43c 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -105,20 +105,21 @@ public class YarnTezDagChild {
private static LinkedBlockingQueue<TezEvent> eventsToSend =
new LinkedBlockingQueue<TezEvent>();
private static AtomicLong requestCounter = new AtomicLong(0);
- private static TezTaskAttemptID currentTaskAttemptID;
private static long amPollInterval;
private static TezTaskUmbilicalProtocol umbilical;
private static ReentrantReadWriteLock taskLock = new ReentrantReadWriteLock();
private static LogicalIOProcessorRuntimeTask currentTask = null;
+ private static TezTaskAttemptID currentTaskAttemptID;
private static AtomicBoolean heartbeatError = new AtomicBoolean(false);
private static Throwable heartbeatErrorException = null;
+ // Implies that the task is done - and the AM is being informed.
+ private static AtomicBoolean currentTaskComplete = new AtomicBoolean(true);
private static Thread startHeartbeatThread() {
Thread heartbeatThread = new Thread(new Runnable() {
public void run() {
while (!(stopped.get() || heartbeatError.get())) {
try {
- Thread.sleep(amPollInterval);
try {
if(!heartbeat()) {
// AM asked us to die
@@ -135,10 +136,15 @@ public class YarnTezDagChild {
// FIXME NEWTEZ maybe send a container failed event to AM?
// Irrecoverable error unless heartbeat sync can be re-established
LOG.error("Heartbeat error in communicating with AM. ", e);
+ if (e instanceof Error) {
+ LOG.error("Exception of type Error. Exiting now");
+ ExitUtil.terminate(-1, e);
+ }
heartbeatErrorException = e;
heartbeatError.set(true);
break;
}
+ Thread.sleep(amPollInterval);
} catch (InterruptedException e) {
// we were interrupted so that we will stop.
LOG.info("Heartbeat thread interrupted. " +
@@ -147,9 +153,14 @@ public class YarnTezDagChild {
}
}
- if (!stopped.get()) {
- // if we are not stopping because the main thread told us to do so,
- // then bring down the entire process
+ if (currentTaskComplete.get() || stopped.get()) {
+ // Don't exit. The Tez framework has control, let the container finish after cleanup etc.
+ // Makes an assumption that a heartbeat shouldDie will be reported as a getTask should die.
+ LOG.info("Current task marked as complete. Stopping heartbeat thread and allowing normal container shutdown");
+ return;
+ } else {
+ // Assuming the task is still running, and we've been asked to die or an error occurred.
+ // Stop the process.
if (heartbeatErrorException != null) {
ExitUtil.terminate(-1, heartbeatErrorException);
} else {
@@ -341,10 +352,17 @@ public class YarnTezDagChild {
new TezEvent(new TaskAttemptFailedEvent(diagnostics),
sourceInfo);
try {
+ // Not setting taskComplete - since the main loop responsible for cleanup doesn't have
+ // control yet. Getting control depends on whether the I/P/O returns correctly after
+ // reporting an error.
heartbeat(Collections.singletonList(taskAttemptFailedEvent));
} catch (Throwable t) {
LOG.fatal("Failed to communicate task attempt failure to AM via"
+ " umbilical", t);
+ if (t instanceof Error) {
+ LOG.error("Exception of type Error. Exiting now");
+ ExitUtil.terminate(-1, t);
+ }
// FIXME NEWTEZ maybe send a container failed event to AM?
// Irrecoverable error unless heartbeat sync can be re-established
heartbeatErrorException = t;
@@ -422,6 +440,7 @@ public class YarnTezDagChild {
currentTaskAttemptID = taskSpec.getTaskAttemptID();
TezVertexID newVertexId =
currentTaskAttemptID.getTaskID().getVertexID();
+ currentTaskComplete.set(false);
if (lastVertexId != null) {
if (!lastVertexId.equals(newVertexId)) {
@@ -470,6 +489,9 @@ public class YarnTezDagChild {
LOG.info("Task completed"
+ ", taskAttemptId=" + currentTaskAttemptID
+ ", fatalErrorOccurred=" + currentTask.hadFatalError());
+ // Mark taskComplete - irrespective of failure, framework has control from this point.
+ currentTaskComplete.set(true);
+ // TODONEWTEZ Should the container continue to run if the running task reported a fatal error ?
if (!currentTask.hadFatalError()) {
TezEvent statusUpdateEvent =
new TezEvent(new TaskStatusUpdateEvent(
@@ -503,6 +525,9 @@ public class YarnTezDagChild {
}
}
} catch (FSError e) {
+ // Heartbeats controlled manually after this.
+ stopped.set(true);
+ heartbeatThread.interrupt();
LOG.fatal("FSError from child", e);
// TODO NEWTEZ this should be a container failed event?
try {
@@ -514,12 +539,20 @@ public class YarnTezDagChild {
new TezEvent(new TaskAttemptFailedEvent(
StringUtils.stringifyException(e)),
currentSourceInfo);
+ currentTaskComplete.set(true);
heartbeat(Collections.singletonList(taskAttemptFailedEvent));
}
} finally {
taskLock.readLock().unlock();
}
} catch (Throwable throwable) {
+ // Heartbeats controlled manually after this.
+ if (throwable instanceof Error) {
+ LOG.error("Exception of type Error. Exiting now");
+ ExitUtil.terminate(-1, throwable);
+ }
+ stopped.set(true);
+ heartbeatThread.interrupt();
String cause = StringUtils.stringifyException(throwable);
LOG.fatal("Error running child : " + cause);
taskLock.readLock().lock();
@@ -530,6 +563,7 @@ public class YarnTezDagChild {
TezEvent taskAttemptFailedEvent =
new TezEvent(new TaskAttemptFailedEvent(cause),
currentSourceInfo);
+ currentTaskComplete.set(true);
heartbeat(Collections.singletonList(taskAttemptFailedEvent));
}
} finally {