You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2013/10/15 03:32:39 UTC

git commit: TEZ-541. Containers should exit when a heartbeat fails (bikas)

Updated Branches:
  refs/heads/master ef782312e -> 37bf91880


TEZ-541. Containers should exit when a heartbeat fails (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/37bf9188
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/37bf9188
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/37bf9188

Branch: refs/heads/master
Commit: 37bf91880d0ba55c0c19d1e047d6b4fa577f7928
Parents: ef78231
Author: Bikas Saha <bi...@apache.org>
Authored: Mon Oct 14 18:29:55 2013 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Mon Oct 14 18:29:55 2013 -0700

----------------------------------------------------------------------
 .../apache/hadoop/mapred/YarnTezDagChild.java   | 35 ++++++++++++++------
 1 file changed, 24 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/37bf9188/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index 5b4793b..1adb5a8 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
@@ -115,13 +116,13 @@ public class YarnTezDagChild {
   private static Thread startHeartbeatThread() {
     Thread heartbeatThread = new Thread(new Runnable() {
       public void run() {
-        while (!stopped.get() && !Thread.currentThread().isInterrupted()
-            && !heartbeatError.get()) {
+        while (!(stopped.get() || heartbeatError.get())) {
           try {
             Thread.sleep(amPollInterval);
             try {
               if(!heartbeat()) {
-                return;
+                // AM asked us to die
+                break;
               }
             } catch (InvalidToken e) {
               // FIXME NEWTEZ maybe send a container failed event to AM?
@@ -129,26 +130,37 @@ public class YarnTezDagChild {
               LOG.error("Heartbeat error in authenticating with AM: ", e);
               heartbeatErrorException = e;
               heartbeatError.set(true);
-              return;
+              break;
             } catch (Throwable e) {
               // FIXME NEWTEZ maybe send a container failed event to AM?
               // Irrecoverable error unless heartbeat sync can be re-established
               LOG.error("Heartbeat error in communicating with AM. ", e);
               heartbeatErrorException = e;
               heartbeatError.set(true);
-              return;
+              break;
             }
           } catch (InterruptedException e) {
-            if (!stopped.get()) {
-              LOG.warn("Heartbeat thread interrupted. Returning.");
-            }
-            return;
+            // we were interrupted so that we will stop.
+            LOG.info("Heartbeat thread interrupted. " +
+            " stopped: " + stopped.get() +  " error: " + heartbeatError.get());
+            continue; 
+          }
+        }
+        
+        if (!stopped.get()) {
+          // if we are not stopping because the main thread told us to do so,
+          // then bring down the entire process
+          if (heartbeatErrorException != null) {
+            ExitUtil.terminate(-1, heartbeatErrorException);
+          } else {
+            ExitUtil.terminate(-1, "Exiting Tez Child Process");
           }
         }
       }
     });
     heartbeatThread.setName("Tez Container Heartbeat Thread ["
         + containerIdStr + "]");
+    heartbeatThread.setDaemon(true);
     heartbeatThread.start();
     return heartbeatThread;
   }
@@ -313,7 +325,7 @@ public class YarnTezDagChild {
       }
     });
 
-    Thread heartbeatThread = startHeartbeatThread();
+    final Thread heartbeatThread = startHeartbeatThread();
 
     TezUmbilical tezUmbilical = new TezUmbilical() {
       @Override
@@ -335,8 +347,9 @@ public class YarnTezDagChild {
               + " umbilical", t);
           // FIXME NEWTEZ maybe send a container failed event to AM?
           // Irrecoverable error unless heartbeat sync can be re-established
-          heartbeatError.set(true);
           heartbeatErrorException = t;
+          heartbeatError.set(true);
+          heartbeatThread.interrupt();
         }
       }