You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/10/09 04:12:51 UTC

git commit: TEZ-526. Fixes a race in container heartbeat where the TaskAttemptId was being sent even after a task completes. Contributed by Hitesh Shah.

Updated Branches:
  refs/heads/master a181fba3f -> 90ebb1629


TEZ-526. Fixes a race in container heartbeat where the TaskAttemptId was
being sent even after a task completes. Contributed by Hitesh Shah.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/90ebb162
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/90ebb162
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/90ebb162

Branch: refs/heads/master
Commit: 90ebb16298a2ab13ce9f1fc9aca7d6e6104b19fa
Parents: a181fba
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue Oct 8 19:11:55 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Tue Oct 8 19:11:55 2013 -0700

----------------------------------------------------------------------
 .../apache/hadoop/mapred/YarnTezDagChild.java   | 46 ++++++++++++++++----
 1 file changed, 37 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/90ebb162/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index 3846234..e87bbb0 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -124,11 +124,15 @@ public class YarnTezDagChild {
                 return;
               }
             } catch (InvalidToken e) {
+              // FIXME NEWTEZ maybe send a container failed event to AM?
+              // Irrecoverable error unless heartbeat sync can be re-established
               LOG.error("Heartbeat error in authenticating with AM: ", e);
               heartbeatErrorException = e;
               heartbeatError.set(true);
               return;
             } catch (Throwable e) {
+              // FIXME NEWTEZ maybe send a container failed event to AM?
+              // Irrecoverable error unless heartbeat sync can be re-established
               LOG.error("Heartbeat error in communicating with AM. ", e);
               heartbeatErrorException = e;
               heartbeatError.set(true);
@@ -171,6 +175,11 @@ public class YarnTezDagChild {
               currentTask.getCounters(), currentTask.getProgress()),
                 new EventMetaData(EventProducerConsumerType.SYSTEM,
                     currentTask.getVertexName(), "", taskAttemptID));
+        } else if (outOfBandEvents == null) {
+          LOG.info("Setting TaskAttemptID to null as the task has already"
+            + " completed. Caused by race-condition between the normal"
+            + " heartbeat and out-of-band heartbeats");
+          taskAttemptID = null;
         }
       }
     } finally {
@@ -184,6 +193,7 @@ public class YarnTezDagChild {
     if (outOfBandEvents != null && !outOfBandEvents.isEmpty()) {
       events.addAll(outOfBandEvents);
     }
+
     long reqId = requestCounter.incrementAndGet();
     TezHeartbeatRequest request = new TezHeartbeatRequest(reqId, events,
         containerIdStr, taskAttemptID, eventCounter, eventsRange);
@@ -326,6 +336,8 @@ public class YarnTezDagChild {
         } catch (Throwable t) {
           LOG.fatal("Failed to communicate task attempt failure to AM via"
               + " umbilical", t);
+          // FIXME NEWTEZ maybe send a container failed event to AM?
+          // Irrecoverable error unless heartbeat sync can be re-established
           heartbeatError.set(true);
           heartbeatErrorException = t;
         }
@@ -470,19 +482,35 @@ public class YarnTezDagChild {
     } catch (FSError e) {
       LOG.fatal("FSError from child", e);
       // TODO NEWTEZ this should be a container failed event?
-      TezEvent taskAttemptFailedEvent =
-          new TezEvent(new TaskAttemptFailedEvent(
-              StringUtils.stringifyException(e)),
-              currentSourceInfo);
-      heartbeat(Collections.singletonList(taskAttemptFailedEvent));
+      try {
+        taskLock.readLock().lock();
+        if (currentTask != null && !currentTask.hadFatalError()) {
+          // Prevent dup failure events
+          currentTask.setFatalError(e, "FS Error in Child JVM");
+          TezEvent taskAttemptFailedEvent =
+              new TezEvent(new TaskAttemptFailedEvent(
+                  StringUtils.stringifyException(e)),
+                  currentSourceInfo);
+          heartbeat(Collections.singletonList(taskAttemptFailedEvent));
+        }
+      } finally {
+        taskLock.readLock().unlock();
+      }
     } catch (Throwable throwable) {
       String cause = StringUtils.stringifyException(throwable);
       LOG.fatal("Error running child : " + cause);
-      if (currentTaskAttemptID != null && !currentTask.hadFatalError()) {
-        TezEvent taskAttemptFailedEvent =
+      taskLock.readLock().lock();
+      try {
+        if (currentTask != null && !currentTask.hadFatalError()) {
+          // Prevent dup failure events
+          currentTask.setFatalError(throwable, "Error in Child JVM");
+          TezEvent taskAttemptFailedEvent =
             new TezEvent(new TaskAttemptFailedEvent(cause),
-                currentSourceInfo);
-        heartbeat(Collections.singletonList(taskAttemptFailedEvent));
+              currentSourceInfo);
+          heartbeat(Collections.singletonList(taskAttemptFailedEvent));
+        }
+      } finally {
+        taskLock.readLock().unlock();
       }
     } finally {
       stopped.set(true);