You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/29 03:29:33 UTC

tez git commit: TEZ-2502. Fix for TezTaskRunner2 not killing tasks properly in all situations. (sseth)

Repository: tez
Updated Branches:
  refs/heads/TEZ-2003 1cca40047 -> 9b483ba01


TEZ-2502. Fix for TezTaskRunner2 not killing tasks properly in all situations. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/9b483ba0
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/9b483ba0
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/9b483ba0

Branch: refs/heads/TEZ-2003
Commit: 9b483ba01289a3c17572a5d5ea1d6ca4cf678d11
Parents: 1cca400
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu May 28 18:29:12 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu May 28 18:29:12 2015 -0700

----------------------------------------------------------------------
 TEZ-2003-CHANGES.txt                            |  1 +
 .../apache/tez/runtime/task/TezTaskRunner2.java | 83 ++++++++++++--------
 2 files changed, 53 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/9b483ba0/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index e333832..42c2e1e 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -29,5 +29,6 @@ ALL CHANGES:
   TEZ-2465. Retrun the status of a kill request in TaskRunner2.
   TEZ-2471. NPE in LogicalIOProcessorRuntimeTask while printing thread info.
   TEZ-2495. Inform TaskCommunicaor about Task and Container termination reasons.
+  TEZ-2502. Fix for TezTaskRunner2 not killing tasks properly in all situations.
 
 INCOMPATIBLE CHANGES:

http://git-wip-us.apache.org/repos/asf/tez/blob/9b483ba0/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
index 15629fd..a5fabb5 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
@@ -124,6 +124,8 @@ public class TezTaskRunner2 {
     try {
       ListenableFuture<TaskRunner2CallableResult> future = null;
       synchronized (this) {
+        // All running state changes must be made within a synchronized block to ensure
+        // kills are issued or the task is not setup.
         if (isRunningState()) {
           // Safe to do this within a synchronized block because we're providing
           // the handler on which the Reporter will communicate back. Assuming
@@ -252,27 +254,34 @@ public class TezTaskRunner2 {
    * @return true if the task kill was honored, false otherwise
    */
   public boolean killTask() {
+    boolean isFirstError = false;
     synchronized (this) {
       if (isRunningState()) {
         if (trySettingEndReason(EndReason.KILL_REQUESTED)) {
+          isFirstError = true;
           killTaskRequested.set(true);
-          if (taskRunnerCallable != null) {
-            taskKillStartTime = System.currentTimeMillis();
-            taskRunnerCallable.interruptTask();
-          }
-          return true;
         } else {
-          LOG.info("Ignoring killTask request for {} since end reason is already set to {}",
-              task.getTaskAttemptID(), firstEndReason);
+          logErrorIngored("killTask", null);
         }
       } else {
-        LOG.info("Ignoring killTask request for {} since it is not in a running state",
-            task.getTaskAttemptID());
+        logErrorIngored("killTask", null);
       }
     }
-    return false;
+    if (isFirstError) {
+      logAborting("killTask");
+      killTaskInternal();
+      return true;
+    } else {
+      return false;
+    }
   }
 
+  private void killTaskInternal() {
+    if (taskRunnerCallable != null) {
+      taskKillStartTime = System.currentTimeMillis();
+      taskRunnerCallable.interruptTask();
+    }
+  }
 
   // Checks and changes on these states should happen within a synchronized block,
   // to ensure the first event is the one that is captured and causes specific behaviour.
@@ -310,17 +319,18 @@ public class TezTaskRunner2 {
             errorReporterToAm.set(true);
             oobSignalErrorInProgress = true;
           } else {
-            LOG.info(
-                "Ignoring fatal error since the task has ended for reason: {}. IgnoredError: {} ",
-                firstEndReason, (t == null ? message : t.getMessage()));
+            logErrorIngored("signalFatalError", message);
           }
+        } else {
+          logErrorIngored("signalFatalError", message);
         }
       }
 
       // Informing the TaskReporter here because the running task may not be interruptable.
       // Has to be outside the lock.
       if (isFirstError) {
-        killTask();
+        logAborting("signalFatalError");
+        killTaskInternal();
         try {
           taskReporter.taskFailed(taskAttemptID, t, getTaskDiagnosticsString(t, message), sourceInfo);
         } catch (IOException e) {
@@ -371,19 +381,22 @@ public class TezTaskRunner2 {
           if (trySettingEndReason(EndReason.COMMUNICATION_FAILURE)) {
             registerFirstException(t, null);
             isFirstError = true;
+          } else {
+            logErrorIngored("umbilicalFatalError", null);
           }
           // A race is possible between a task succeeding, and a subsequent timed heartbeat failing.
           // These errors can be ignored, since a task can only succeed if the synchronous taskSucceeded
           // method does not throw an exception, in which case task success is registered with the AM.
           // Leave subsequent heartbeat errors to the next entity to communicate using the TaskReporter
         } else {
-          LOG.info("Ignoring Communication failure since task with id=" + task.getTaskAttemptID()
-              + " is already complete, is failing or has been asked to terminate");
+          logErrorIngored("umbilicalFatalError", null);
         }
+        // Since this error came from the taskReporter - there's no point attempting to report a failure back to it.
+        // However, the task does need to be cleaned up
       }
-      // Since this error came from the taskReporter - there's no point attempting to report a failure back to it.
       if (isFirstError) {
-        killTask();
+        logAborting("umbilicalFatalError");
+        killTaskInternal();
       }
     }
 
@@ -395,18 +408,12 @@ public class TezTaskRunner2 {
         isFirstTerminate = trySettingEndReason(EndReason.CONTAINER_STOP_REQUESTED);
         // Respect stopContainerRequested since it can come in at any point, despite a previous failure.
         stopContainerRequested.set(true);
-
-        if (isFirstTerminate) {
-          LOG.info("Attempting to abort {} since a shutdown request was received",
-              task.getTaskAttemptID());
-          if (taskRunnerCallable != null) {
-            taskKillStartTime = System.currentTimeMillis();
-            taskRunnerCallable.interruptTask();
-          }
-        } else {
-          LOG.info("Not acting on shutdown request for {} since the task is not in running state",
-              task.getTaskAttemptID());
-        }
+      }
+      if (isFirstTerminate) {
+        logAborting("shutdownRequested");
+        killTaskInternal();
+      } else {
+        logErrorIngored("shutdownRequested", null);
       }
     }
   }
@@ -451,6 +458,20 @@ public class TezTaskRunner2 {
 
   private void handleFinalStatusUpdateFailure(Throwable t, boolean successReportAttempted) {
     // TODO Ideally differentiate between FAILED/KILLED
-    LOG.warn("Failure while reporting state= {} to AM", (successReportAttempted ? "success" : "failure/killed"), t);
+    LOG.warn("Failure while reporting state= {} to AM",
+        (successReportAttempted ? "success" : "failure/killed"), t);
+  }
+
+  private void logErrorIngored(String ignoredEndReason, String errorMessage) {
+    LOG.info(
+        "Ignoring {} request since the task with id {} has ended for reason: {}. IgnoredError: {} ",
+        ignoredEndReason, task.getTaskAttemptID(),
+        firstEndReason, (firstException == null ? (errorMessage == null ? "" : errorMessage) :
+            firstException.getMessage()));
+  }
+
+  private void logAborting(String abortReason) {
+    LOG.info("Attempting to abort {} due to an invocation of {}", task.getTaskAttemptID(),
+        abortReason);
   }
 }
\ No newline at end of file