You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/12/03 02:05:15 UTC

git commit: TEZ-658. YarnTezDagChild exits with a non 0 exit code even if a task succeeds. (sseth)

Updated Branches:
  refs/heads/master 74207c4c7 -> 7e3f4c2cd


TEZ-658. YarnTezDagChild exits with a non 0 exit code even if a task
succeeds. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/7e3f4c2c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/7e3f4c2c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/7e3f4c2c

Branch: refs/heads/master
Commit: 7e3f4c2cdf7976015bff9811752271da1da07839
Parents: 74207c4
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Dec 2 17:04:57 2013 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Dec 2 17:04:57 2013 -0800

----------------------------------------------------------------------
 .../apache/hadoop/mapred/YarnTezDagChild.java   | 44 +++++++++++++++++---
 1 file changed, 39 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7e3f4c2c/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index 6e77d37..d2ff43c 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -105,20 +105,21 @@ public class YarnTezDagChild {
   private static LinkedBlockingQueue<TezEvent> eventsToSend =
       new LinkedBlockingQueue<TezEvent>();
   private static AtomicLong requestCounter = new AtomicLong(0);
-  private static TezTaskAttemptID currentTaskAttemptID;
   private static long amPollInterval;
   private static TezTaskUmbilicalProtocol umbilical;
   private static ReentrantReadWriteLock taskLock = new ReentrantReadWriteLock();
   private static LogicalIOProcessorRuntimeTask currentTask = null;
+  private static TezTaskAttemptID currentTaskAttemptID;
   private static AtomicBoolean heartbeatError = new AtomicBoolean(false);
   private static Throwable heartbeatErrorException = null;
+  // Implies that the task is done - and the AM is being informed.
+  private static AtomicBoolean currentTaskComplete = new AtomicBoolean(true);
 
   private static Thread startHeartbeatThread() {
     Thread heartbeatThread = new Thread(new Runnable() {
       public void run() {
         while (!(stopped.get() || heartbeatError.get())) {
           try {
-            Thread.sleep(amPollInterval);
             try {
               if(!heartbeat()) {
                 // AM asked us to die
@@ -135,10 +136,15 @@ public class YarnTezDagChild {
               // FIXME NEWTEZ maybe send a container failed event to AM?
               // Irrecoverable error unless heartbeat sync can be re-established
               LOG.error("Heartbeat error in communicating with AM. ", e);
+              if (e instanceof Error) {
+                LOG.error("Exception of type Error. Exiting now");
+                ExitUtil.terminate(-1, e);
+              }
               heartbeatErrorException = e;
               heartbeatError.set(true);
               break;
             }
+            Thread.sleep(amPollInterval);
           } catch (InterruptedException e) {
             // we were interrupted so that we will stop.
             LOG.info("Heartbeat thread interrupted. " +
@@ -147,9 +153,14 @@ public class YarnTezDagChild {
           }
         }
         
-        if (!stopped.get()) {
-          // if we are not stopping because the main thread told us to do so,
-          // then bring down the entire process
+        if (currentTaskComplete.get() || stopped.get()) {
+          // Don't exit. The Tez framework has control, let the container finish after cleanup etc.
+          // Makes an assumption that a heartbeat shouldDie will be reported as a getTask should die.
+          LOG.info("Current task marked as complete. Stopping heartbeat thread and allowing normal container shutdown");
+          return;
+        } else {
+          // Assuming the task is still running, and we've been asked to die or an error occurred.
+          // Stop the process.
           if (heartbeatErrorException != null) {
             ExitUtil.terminate(-1, heartbeatErrorException);
           } else {
@@ -341,10 +352,17 @@ public class YarnTezDagChild {
             new TezEvent(new TaskAttemptFailedEvent(diagnostics),
                 sourceInfo);
         try {
+          // Not setting taskComplete - since the main loop responsible for cleanup doesn't have
+          // control yet. Getting control depends on whether the I/P/O returns correctly after
+          // reporting an error.
           heartbeat(Collections.singletonList(taskAttemptFailedEvent));
         } catch (Throwable t) {
           LOG.fatal("Failed to communicate task attempt failure to AM via"
               + " umbilical", t);
+          if (t instanceof Error) {
+            LOG.error("Exception of type Error. Exiting now");
+            ExitUtil.terminate(-1, t);
+          }
           // FIXME NEWTEZ maybe send a container failed event to AM?
           // Irrecoverable error unless heartbeat sync can be re-established
           heartbeatErrorException = t;
@@ -422,6 +440,7 @@ public class YarnTezDagChild {
           currentTaskAttemptID = taskSpec.getTaskAttemptID();
           TezVertexID newVertexId =
               currentTaskAttemptID.getTaskID().getVertexID();
+          currentTaskComplete.set(false);
 
           if (lastVertexId != null) {
             if (!lastVertexId.equals(newVertexId)) {
@@ -470,6 +489,9 @@ public class YarnTezDagChild {
               LOG.info("Task completed"
                   + ", taskAttemptId=" + currentTaskAttemptID
                   + ", fatalErrorOccurred=" + currentTask.hadFatalError());
+              // Mark taskComplete - irrespective of failure, framework has control from this point.
+              currentTaskComplete.set(true);
+              // TODONEWTEZ Should the container continue to run if the running task reported a fatal error ?
               if (!currentTask.hadFatalError()) {
                 TezEvent statusUpdateEvent =
                     new TezEvent(new TaskStatusUpdateEvent(
@@ -503,6 +525,9 @@ public class YarnTezDagChild {
         }
       }
     } catch (FSError e) {
+      // Heartbeats controlled manually after this.
+      stopped.set(true);
+      heartbeatThread.interrupt();
       LOG.fatal("FSError from child", e);
       // TODO NEWTEZ this should be a container failed event?
       try {
@@ -514,12 +539,20 @@ public class YarnTezDagChild {
               new TezEvent(new TaskAttemptFailedEvent(
                   StringUtils.stringifyException(e)),
                   currentSourceInfo);
+          currentTaskComplete.set(true);
           heartbeat(Collections.singletonList(taskAttemptFailedEvent));
         }
       } finally {
         taskLock.readLock().unlock();
       }
     } catch (Throwable throwable) {
+      // Heartbeats controlled manually after this.
+      if (throwable instanceof Error) {
+        LOG.error("Exception of type Error. Exiting now");
+        ExitUtil.terminate(-1, throwable);
+      }
+      stopped.set(true);
+      heartbeatThread.interrupt();
       String cause = StringUtils.stringifyException(throwable);
       LOG.fatal("Error running child : " + cause);
       taskLock.readLock().lock();
@@ -530,6 +563,7 @@ public class YarnTezDagChild {
           TezEvent taskAttemptFailedEvent =
             new TezEvent(new TaskAttemptFailedEvent(cause),
               currentSourceInfo);
+          currentTaskComplete.set(true);
           heartbeat(Collections.singletonList(taskAttemptFailedEvent));
         }
       } finally {