You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/22 03:19:07 UTC

[17/50] [abbrv] tez git commit: TEZ-2443. TaskRunner2 should call abort, NPEs while cleaning up tasks. (sseth)

TEZ-2443. TaskRunner2 should call abort, NPEs while cleaning up tasks. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3d795d62
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3d795d62
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3d795d62

Branch: refs/heads/TEZ-2003
Commit: 3d795d62c0d471ff866adb9002e8c6fe6ea9536a
Parents: 7cf416a
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue May 12 14:27:42 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Aug 21 18:13:55 2015 -0700

----------------------------------------------------------------------
 TEZ-2003-CHANGES.txt                                   |  1 +
 .../java/org/apache/tez/dag/api/TaskCommunicator.java  |  4 ++++
 .../tez/runtime/LogicalIOProcessorRuntimeTask.java     | 11 ++++++-----
 .../main/java/org/apache/tez/runtime/RuntimeTask.java  |  2 +-
 .../apache/tez/runtime/task/TaskRunner2Callable.java   | 13 +++++++------
 .../org/apache/tez/runtime/task/TezTaskRunner2.java    | 10 ++++++----
 6 files changed, 25 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/3d795d62/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index 5d2e40a..ed72d6b 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -25,5 +25,6 @@ ALL CHANGES:
   TEZ-2433. Fixes after rebase 05/08
   TEZ-2438. tez-tools version in the branch is incorrect.
   TEZ-2434. Allow tasks to be killed in the Runtime.
+  TEZ-2443. TaskRunner2 should call abort, NPEs while cleaning up tasks.
 
 INCOMPATIBLE CHANGES:

http://git-wip-us.apache.org/repos/asf/tez/blob/3d795d62/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
index cadca0c..2651013 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
@@ -48,6 +48,10 @@ public abstract class TaskCommunicator extends AbstractService {
   // TODO TEZ-2003. Are additional APIs required to mark a container as completed ? - for completeness.
 
   // TODO TEZ-2003 Remove reference to TaskAttemptID
+  // TODO TEZ-2003 This needs some information about why the attempt is being unregistered.
+  // e.g. preempted in which case the task may need to be informed. Alternately as a result of
+  // a failed task.
+  // In case of preemption - a killTask API is likely a better bet than trying to overload this method.
   public abstract void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID);
 
   // TODO TEZ-2003 This doesn't necessarily belong here. A server may not start within the AM.

http://git-wip-us.apache.org/repos/asf/tez/blob/3d795d62/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index 8263b3f..de08e56 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -704,7 +704,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   }
 
   @Override
-  public synchronized void abortTask() throws Exception {
+  public synchronized void abortTask() {
     if (processor != null) {
       processor.abort();
     }
@@ -803,6 +803,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
       LOG.debug("Num of inputs to be closed={}", initializedInputs.size());
       LOG.debug("Num of outputs to be closed={}", initializedOutputs.size());
     }
+
     // Close processor
     if (!processorClosed && processor != null) {
       try {
@@ -820,8 +821,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
         Thread.currentThread().interrupt();
       } catch (Throwable e) {
         LOG.warn(
-            "Ignoring Exception when closing processor(cleanup). Exception class={}, message={}",
-                e.getClass().getName(), e.getMessage());
+            "Ignoring Exception when closing processor(cleanup). Exception class={}, message={}" +
+                e.getClass().getName(), e.getMessage(), e);
       }
     }
 
@@ -842,7 +843,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
       } catch (Throwable e) {
         LOG.warn(
             "Ignoring exception when closing input {}(cleanup). Exception class={}, message={}",
-            srcVertexName, e.getClass().getName(), e.getMessage());
+            srcVertexName, e.getClass().getName(), e.getMessage(), e);
       } finally {
         LOG.info("Close input for vertex={}, sourceVertex={}, interruptedStatus={}", processor
             .getContext().getTaskVertexName(), srcVertexName, Thread.currentThread().isInterrupted());
@@ -866,7 +867,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
       } catch (Throwable e) {
         LOG.warn(
             "Ignoring exception when closing output {}(cleanup). Exception class={}, message={}",
-            destVertexName, e.getClass().getName(), e.getMessage());
+            destVertexName, e.getClass().getName(), e.getMessage(), e);
       } finally {
         LOG.info("Close input for vertex={}, sourceVertex={}, interruptedStatus={}", processor
             .getContext().getTaskVertexName(), destVertexName, Thread.currentThread().isInterrupted());

http://git-wip-us.apache.org/repos/asf/tez/blob/3d795d62/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
index cdfb46a..33c0113 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
@@ -167,5 +167,5 @@ public abstract class RuntimeTask {
     taskDone.set(true);
   }
 
-  public abstract void abortTask() throws Exception;
+  public abstract void abortTask();
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/3d795d62/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java
index 7315bbd..ab77635 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java
@@ -63,26 +63,26 @@ public class TaskRunner2Callable extends CallableWithNdc<TaskRunner2Callable.Tas
           if (stopRequested.get() || Thread.currentThread().isInterrupted()) {
             return new TaskRunner2CallableResult(null);
           }
-          LOG.info("Initializing task" + ", taskAttemptId=" + task.getTaskAttemptID());
+          LOG.info("Initializing task" + ", taskAttemptId={}", task.getTaskAttemptID());
           task.initialize();
 
           if (!stopRequested.get() && !Thread.currentThread().isInterrupted()) {
-            LOG.info("Running task, taskAttemptId=" + task.getTaskAttemptID());
+            LOG.info("Running task, taskAttemptId={}", task.getTaskAttemptID());
             task.run();
           } else {
-            LOG.info("Stopped before running the processor.");
+            LOG.info("Stopped before running the processor taskAttemptId={}", task.getTaskAttemptID());
             return new TaskRunner2CallableResult(null);
           }
 
           if (!stopRequested.get() && !Thread.currentThread().isInterrupted()) {
-            LOG.info("Closing task, taskAttemptId=" + task.getTaskAttemptID());
+            LOG.info("Closing task, taskAttemptId={}", task.getTaskAttemptID());
             task.close();
             task.setFrameworkCounters();
           } else {
-            LOG.info("Stopped before closing the processor");
+            LOG.info("Stopped before closing the processor, taskAttemptId={}", task.getTaskAttemptID());
             return new TaskRunner2CallableResult(null);
           }
-          LOG.info("Task completed, taskAttemptId=" + task.getTaskAttemptID() + ", askedToStop=" + stopRequested.get());
+          LOG.info("Task completed, taskAttemptId={}, askedToStop={}", task.getTaskAttemptID(), stopRequested.get());
 
 
           return new TaskRunner2CallableResult(null);
@@ -115,6 +115,7 @@ public class TaskRunner2Callable extends CallableWithNdc<TaskRunner2Callable.Tas
   public void interruptTask() {
     // Ensure the task is only interrupted once.
     if (!stopRequested.getAndSet(true)) {
+      task.abortTask();
       if (ownThread != null) {
         ownThread.interrupt();
       }

http://git-wip-us.apache.org/repos/asf/tez/blob/3d795d62/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
index 73e5c76..ffbc6e8 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
@@ -250,10 +250,12 @@ public class TezTaskRunner2 {
   public void killTask() {
     synchronized (this) {
       if (isRunningState()) {
-        trySettingEndReason(EndReason.KILL_REQUESTED);
-        if (taskRunnerCallable != null) {
-          taskKillStartTime = System.currentTimeMillis();
-          taskRunnerCallable.interruptTask();
+        if (trySettingEndReason(EndReason.KILL_REQUESTED)) {
+          killTaskRequested.set(true);
+          if (taskRunnerCallable != null) {
+            taskKillStartTime = System.currentTimeMillis();
+            taskRunnerCallable.interruptTask();
+          }
         }
       }
     }