You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/10/09 04:12:51 UTC
git commit: TEZ-526. Fixes a race in container heartbeat where the
TaskAttemptId was being sent even after a task completes. Contributed by
Hitesh Shah.
Updated Branches:
refs/heads/master a181fba3f -> 90ebb1629
TEZ-526. Fixes a race in container heartbeat where the TaskAttemptId was
being sent even after a task completes. Contributed by Hitesh Shah.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/90ebb162
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/90ebb162
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/90ebb162
Branch: refs/heads/master
Commit: 90ebb16298a2ab13ce9f1fc9aca7d6e6104b19fa
Parents: a181fba
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue Oct 8 19:11:55 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Tue Oct 8 19:11:55 2013 -0700
----------------------------------------------------------------------
.../apache/hadoop/mapred/YarnTezDagChild.java | 46 ++++++++++++++++----
1 file changed, 37 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/90ebb162/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index 3846234..e87bbb0 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -124,11 +124,15 @@ public class YarnTezDagChild {
return;
}
} catch (InvalidToken e) {
+ // FIXME NEWTEZ maybe send a container failed event to AM?
+ // Irrecoverable error unless heartbeat sync can be re-established
LOG.error("Heartbeat error in authenticating with AM: ", e);
heartbeatErrorException = e;
heartbeatError.set(true);
return;
} catch (Throwable e) {
+ // FIXME NEWTEZ maybe send a container failed event to AM?
+ // Irrecoverable error unless heartbeat sync can be re-established
LOG.error("Heartbeat error in communicating with AM. ", e);
heartbeatErrorException = e;
heartbeatError.set(true);
@@ -171,6 +175,11 @@ public class YarnTezDagChild {
currentTask.getCounters(), currentTask.getProgress()),
new EventMetaData(EventProducerConsumerType.SYSTEM,
currentTask.getVertexName(), "", taskAttemptID));
+ } else if (outOfBandEvents == null) {
+ LOG.info("Setting TaskAttemptID to null as the task has already"
+ + " completed. Caused by race-condition between the normal"
+ + " heartbeat and out-of-band heartbeats");
+ taskAttemptID = null;
}
}
} finally {
@@ -184,6 +193,7 @@ public class YarnTezDagChild {
if (outOfBandEvents != null && !outOfBandEvents.isEmpty()) {
events.addAll(outOfBandEvents);
}
+
long reqId = requestCounter.incrementAndGet();
TezHeartbeatRequest request = new TezHeartbeatRequest(reqId, events,
containerIdStr, taskAttemptID, eventCounter, eventsRange);
@@ -326,6 +336,8 @@ public class YarnTezDagChild {
} catch (Throwable t) {
LOG.fatal("Failed to communicate task attempt failure to AM via"
+ " umbilical", t);
+ // FIXME NEWTEZ maybe send a container failed event to AM?
+ // Irrecoverable error unless heartbeat sync can be re-established
heartbeatError.set(true);
heartbeatErrorException = t;
}
@@ -470,19 +482,35 @@ public class YarnTezDagChild {
} catch (FSError e) {
LOG.fatal("FSError from child", e);
// TODO NEWTEZ this should be a container failed event?
- TezEvent taskAttemptFailedEvent =
- new TezEvent(new TaskAttemptFailedEvent(
- StringUtils.stringifyException(e)),
- currentSourceInfo);
- heartbeat(Collections.singletonList(taskAttemptFailedEvent));
+ try {
+ taskLock.readLock().lock();
+ if (currentTask != null && !currentTask.hadFatalError()) {
+ // Prevent dup failure events
+ currentTask.setFatalError(e, "FS Error in Child JVM");
+ TezEvent taskAttemptFailedEvent =
+ new TezEvent(new TaskAttemptFailedEvent(
+ StringUtils.stringifyException(e)),
+ currentSourceInfo);
+ heartbeat(Collections.singletonList(taskAttemptFailedEvent));
+ }
+ } finally {
+ taskLock.readLock().unlock();
+ }
} catch (Throwable throwable) {
String cause = StringUtils.stringifyException(throwable);
LOG.fatal("Error running child : " + cause);
- if (currentTaskAttemptID != null && !currentTask.hadFatalError()) {
- TezEvent taskAttemptFailedEvent =
+ taskLock.readLock().lock();
+ try {
+ if (currentTask != null && !currentTask.hadFatalError()) {
+ // Prevent dup failure events
+ currentTask.setFatalError(throwable, "Error in Child JVM");
+ TezEvent taskAttemptFailedEvent =
new TezEvent(new TaskAttemptFailedEvent(cause),
- currentSourceInfo);
- heartbeat(Collections.singletonList(taskAttemptFailedEvent));
+ currentSourceInfo);
+ heartbeat(Collections.singletonList(taskAttemptFailedEvent));
+ }
+ } finally {
+ taskLock.readLock().unlock();
}
} finally {
stopped.set(true);