You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/22 09:26:16 UTC
[17/50] [abbrv] tez git commit: TEZ-2502. Fix for TezTaskRunner2 not
killing tasks properly in all situations. (sseth)
TEZ-2502. Fix for TezTaskRunner2 not killing tasks properly in all situations. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/5846b8de
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/5846b8de
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/5846b8de
Branch: refs/heads/master
Commit: 5846b8de7c224f771ce335622615aadaeb2376d5
Parents: 495ec38
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu May 28 18:29:12 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Aug 21 18:13:55 2015 -0700
----------------------------------------------------------------------
TEZ-2003-CHANGES.txt | 1 +
.../apache/tez/runtime/task/TezTaskRunner2.java | 83 ++++++++++++--------
2 files changed, 53 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/5846b8de/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index e333832..42c2e1e 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -29,5 +29,6 @@ ALL CHANGES:
TEZ-2465. Retrun the status of a kill request in TaskRunner2.
TEZ-2471. NPE in LogicalIOProcessorRuntimeTask while printing thread info.
TEZ-2495. Inform TaskCommunicaor about Task and Container termination reasons.
+ TEZ-2502. Fix for TezTaskRunner2 not killing tasks properly in all situations.
INCOMPATIBLE CHANGES:
http://git-wip-us.apache.org/repos/asf/tez/blob/5846b8de/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
index 15629fd..a5fabb5 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
@@ -124,6 +124,8 @@ public class TezTaskRunner2 {
try {
ListenableFuture<TaskRunner2CallableResult> future = null;
synchronized (this) {
+ // All running state changes must be made within a synchronized block to ensure
+ // kills are issued or the task is not setup.
if (isRunningState()) {
// Safe to do this within a synchronized block because we're providing
// the handler on which the Reporter will communicate back. Assuming
@@ -252,27 +254,34 @@ public class TezTaskRunner2 {
* @return true if the task kill was honored, false otherwise
*/
public boolean killTask() {
+ boolean isFirstError = false;
synchronized (this) {
if (isRunningState()) {
if (trySettingEndReason(EndReason.KILL_REQUESTED)) {
+ isFirstError = true;
killTaskRequested.set(true);
- if (taskRunnerCallable != null) {
- taskKillStartTime = System.currentTimeMillis();
- taskRunnerCallable.interruptTask();
- }
- return true;
} else {
- LOG.info("Ignoring killTask request for {} since end reason is already set to {}",
- task.getTaskAttemptID(), firstEndReason);
+ logErrorIngored("killTask", null);
}
} else {
- LOG.info("Ignoring killTask request for {} since it is not in a running state",
- task.getTaskAttemptID());
+ logErrorIngored("killTask", null);
}
}
- return false;
+ if (isFirstError) {
+ logAborting("killTask");
+ killTaskInternal();
+ return true;
+ } else {
+ return false;
+ }
}
+ private void killTaskInternal() {
+ if (taskRunnerCallable != null) {
+ taskKillStartTime = System.currentTimeMillis();
+ taskRunnerCallable.interruptTask();
+ }
+ }
// Checks and changes on these states should happen within a synchronized block,
// to ensure the first event is the one that is captured and causes specific behaviour.
@@ -310,17 +319,18 @@ public class TezTaskRunner2 {
errorReporterToAm.set(true);
oobSignalErrorInProgress = true;
} else {
- LOG.info(
- "Ignoring fatal error since the task has ended for reason: {}. IgnoredError: {} ",
- firstEndReason, (t == null ? message : t.getMessage()));
+ logErrorIngored("signalFatalError", message);
}
+ } else {
+ logErrorIngored("signalFatalError", message);
}
}
// Informing the TaskReporter here because the running task may not be interruptable.
// Has to be outside the lock.
if (isFirstError) {
- killTask();
+ logAborting("signalFatalError");
+ killTaskInternal();
try {
taskReporter.taskFailed(taskAttemptID, t, getTaskDiagnosticsString(t, message), sourceInfo);
} catch (IOException e) {
@@ -371,19 +381,22 @@ public class TezTaskRunner2 {
if (trySettingEndReason(EndReason.COMMUNICATION_FAILURE)) {
registerFirstException(t, null);
isFirstError = true;
+ } else {
+ logErrorIngored("umbilicalFatalError", null);
}
// A race is possible between a task succeeding, and a subsequent timed heartbeat failing.
// These errors can be ignored, since a task can only succeed if the synchronous taskSucceeded
// method does not throw an exception, in which case task success is registered with the AM.
// Leave subsequent heartbeat errors to the next entity to communicate using the TaskReporter
} else {
- LOG.info("Ignoring Communication failure since task with id=" + task.getTaskAttemptID()
- + " is already complete, is failing or has been asked to terminate");
+ logErrorIngored("umbilicalFatalError", null);
}
+ // Since this error came from the taskReporter - there's no point attempting to report a failure back to it.
+ // However, the task does need to be cleaned up
}
- // Since this error came from the taskReporter - there's no point attempting to report a failure back to it.
if (isFirstError) {
- killTask();
+ logAborting("umbilicalFatalError");
+ killTaskInternal();
}
}
@@ -395,18 +408,12 @@ public class TezTaskRunner2 {
isFirstTerminate = trySettingEndReason(EndReason.CONTAINER_STOP_REQUESTED);
// Respect stopContainerRequested since it can come in at any point, despite a previous failure.
stopContainerRequested.set(true);
-
- if (isFirstTerminate) {
- LOG.info("Attempting to abort {} since a shutdown request was received",
- task.getTaskAttemptID());
- if (taskRunnerCallable != null) {
- taskKillStartTime = System.currentTimeMillis();
- taskRunnerCallable.interruptTask();
- }
- } else {
- LOG.info("Not acting on shutdown request for {} since the task is not in running state",
- task.getTaskAttemptID());
- }
+ }
+ if (isFirstTerminate) {
+ logAborting("shutdownRequested");
+ killTaskInternal();
+ } else {
+ logErrorIngored("shutdownRequested", null);
}
}
}
@@ -451,6 +458,20 @@ public class TezTaskRunner2 {
private void handleFinalStatusUpdateFailure(Throwable t, boolean successReportAttempted) {
// TODO Ideally differentiate between FAILED/KILLED
- LOG.warn("Failure while reporting state= {} to AM", (successReportAttempted ? "success" : "failure/killed"), t);
+ LOG.warn("Failure while reporting state= {} to AM",
+ (successReportAttempted ? "success" : "failure/killed"), t);
+ }
+
+ private void logErrorIngored(String ignoredEndReason, String errorMessage) {
+ LOG.info(
+ "Ignoring {} request since the task with id {} has ended for reason: {}. IgnoredError: {} ",
+ ignoredEndReason, task.getTaskAttemptID(),
+ firstEndReason, (firstException == null ? (errorMessage == null ? "" : errorMessage) :
+ firstException.getMessage()));
+ }
+
+ private void logAborting(String abortReason) {
+ LOG.info("Attempting to abort {} due to an invocation of {}", task.getTaskAttemptID(),
+ abortReason);
}
}
\ No newline at end of file