You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2013/10/15 03:32:39 UTC
git commit: TEZ-541. Containers should exit when a heartbeat fails
(bikas)
Updated Branches:
refs/heads/master ef782312e -> 37bf91880
TEZ-541. Containers should exit when a heartbeat fails (bikas)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/37bf9188
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/37bf9188
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/37bf9188
Branch: refs/heads/master
Commit: 37bf91880d0ba55c0c19d1e047d6b4fa577f7928
Parents: ef78231
Author: Bikas Saha <bi...@apache.org>
Authored: Mon Oct 14 18:29:55 2013 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Mon Oct 14 18:29:55 2013 -0700
----------------------------------------------------------------------
.../apache/hadoop/mapred/YarnTezDagChild.java | 35 ++++++++++++++------
1 file changed, 24 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/37bf9188/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index 5b4793b..1adb5a8 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.api.ApplicationConstants;
@@ -115,13 +116,13 @@ public class YarnTezDagChild {
private static Thread startHeartbeatThread() {
Thread heartbeatThread = new Thread(new Runnable() {
public void run() {
- while (!stopped.get() && !Thread.currentThread().isInterrupted()
- && !heartbeatError.get()) {
+ while (!(stopped.get() || heartbeatError.get())) {
try {
Thread.sleep(amPollInterval);
try {
if(!heartbeat()) {
- return;
+ // AM asked us to die
+ break;
}
} catch (InvalidToken e) {
// FIXME NEWTEZ maybe send a container failed event to AM?
@@ -129,26 +130,37 @@ public class YarnTezDagChild {
LOG.error("Heartbeat error in authenticating with AM: ", e);
heartbeatErrorException = e;
heartbeatError.set(true);
- return;
+ break;
} catch (Throwable e) {
// FIXME NEWTEZ maybe send a container failed event to AM?
// Irrecoverable error unless heartbeat sync can be re-established
LOG.error("Heartbeat error in communicating with AM. ", e);
heartbeatErrorException = e;
heartbeatError.set(true);
- return;
+ break;
}
} catch (InterruptedException e) {
- if (!stopped.get()) {
- LOG.warn("Heartbeat thread interrupted. Returning.");
- }
- return;
+ // we were interrupted so that we will stop.
+ LOG.info("Heartbeat thread interrupted. " +
+ " stopped: " + stopped.get() + " error: " + heartbeatError.get());
+ continue;
+ }
+ }
+
+ if (!stopped.get()) {
+ // if we are not stopping because the main thread told us to do so,
+ // then bring down the entire process
+ if (heartbeatErrorException != null) {
+ ExitUtil.terminate(-1, heartbeatErrorException);
+ } else {
+ ExitUtil.terminate(-1, "Exiting Tez Child Process");
}
}
}
});
heartbeatThread.setName("Tez Container Heartbeat Thread ["
+ containerIdStr + "]");
+ heartbeatThread.setDaemon(true);
heartbeatThread.start();
return heartbeatThread;
}
@@ -313,7 +325,7 @@ public class YarnTezDagChild {
}
});
- Thread heartbeatThread = startHeartbeatThread();
+ final Thread heartbeatThread = startHeartbeatThread();
TezUmbilical tezUmbilical = new TezUmbilical() {
@Override
@@ -335,8 +347,9 @@ public class YarnTezDagChild {
+ " umbilical", t);
// FIXME NEWTEZ maybe send a container failed event to AM?
// Irrecoverable error unless heartbeat sync can be re-established
- heartbeatError.set(true);
heartbeatErrorException = t;
+ heartbeatError.set(true);
+ heartbeatThread.interrupt();
}
}