You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/12 23:28:04 UTC

tez git commit: TEZ-2443. TaskRunner2 should call abort, NPEs while cleaning up tasks. (sseth)

Repository: tez
Updated Branches:
  refs/heads/TEZ-2003 b787648ed -> 5f9653233


TEZ-2443. TaskRunner2 should call abort, NPEs while cleaning up tasks. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/5f965323
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/5f965323
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/5f965323

Branch: refs/heads/TEZ-2003
Commit: 5f96532332eda1246412c3e0af3661aa9179ce96
Parents: b787648
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue May 12 14:27:42 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Tue May 12 14:27:42 2015 -0700

----------------------------------------------------------------------
 TEZ-2003-CHANGES.txt                            |  1 +
 .../apache/tez/dag/api/TaskCommunicator.java    |  4 ++
 .../runtime/LogicalIOProcessorRuntimeTask.java  | 75 ++++++++++----------
 .../org/apache/tez/runtime/RuntimeTask.java     |  2 +-
 .../tez/runtime/task/TaskRunner2Callable.java   | 13 ++--
 .../apache/tez/runtime/task/TezTaskRunner2.java | 10 +--
 6 files changed, 56 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/5f965323/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index 5d2e40a..ed72d6b 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -25,5 +25,6 @@ ALL CHANGES:
   TEZ-2433. Fixes after rebase 05/08
   TEZ-2438. tez-tools version in the branch is incorrect.
   TEZ-2434. Allow tasks to be killed in the Runtime.
+  TEZ-2443. TaskRunner2 should call abort, NPEs while cleaning up tasks.
 
 INCOMPATIBLE CHANGES:

http://git-wip-us.apache.org/repos/asf/tez/blob/5f965323/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
index cadca0c..2651013 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
@@ -48,6 +48,10 @@ public abstract class TaskCommunicator extends AbstractService {
   // TODO TEZ-2003. Are additional APIs required to mark a container as completed ? - for completeness.
 
   // TODO TEZ-2003 Remove reference to TaskAttemptID
+  // TODO TEZ-2003 This needs some information about why the attempt is being unregistered.
+  // e.g. preempted in which case the task may need to be informed. Alternately as a result of
+  // a failed task.
+  // In case of preemption - a killTask API is likely a better bet than trying to overload this method.
   public abstract void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID);
 
   // TODO TEZ-2003 This doesn't necessarily belong here. A server may not start within the AM.

http://git-wip-us.apache.org/repos/asf/tez/blob/5f965323/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index 9a0e397..f32da76 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -119,7 +119,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   final ConcurrentHashMap<String, LogicalInput> initializedInputs;
   final ConcurrentHashMap<String, LogicalOutput> initializedOutputs;
 
-  private boolean processorClosed;
+  private boolean processorClosed = false;
   final ProcessorDescriptor processorDescriptor;
   AbstractLogicalIOProcessor processor;
   ProcessorContext processorContext;
@@ -693,7 +693,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   }
 
   @Override
-  public synchronized void abortTask() throws Exception {
+  public synchronized void abortTask() {
     if (processor != null) {
       processor.abort();
     }
@@ -791,7 +791,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
       LOG.debug("Num of inputs to be closed={}", initializedInputs.size());
       LOG.debug("Num of outputs to be closed={}", initializedOutputs.size());
     }
-    if (!processorClosed) {
+    if (!processorClosed && processor != null) {
       try {
         processorClosed = true;
         processor.close();
@@ -808,19 +808,20 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
         LOG.info("Resetting interrupt for processor");
         Thread.currentThread().interrupt();
       } catch (Throwable e) {
-        LOG.warn("Exception when closing processor", e);
+        LOG.warn(
+            "Ignoring Exception when closing processor(cleanup). Exception class={}, message={}" +
+                e.getClass().getName(), e.getMessage(), e);
       }
     }
+
     // Close the remaining inited Inputs.
-    Iterator<String> srcVertexItr = initializedInputs.keySet().iterator();
-    while (srcVertexItr.hasNext()) {
-      String srcVertexName = srcVertexItr.next();
+    Iterator<Map.Entry<String, LogicalInput>> inputIterator = initializedInputs.entrySet().iterator();
+    while (inputIterator.hasNext()) {
+      Map.Entry<String, LogicalInput> entry = inputIterator.next();
+      String srcVertexName = entry.getKey();
+      inputIterator.remove();
       try {
-        srcVertexItr.remove();
-
-        initializedInputs.remove(srcVertexName);
-        ((InputFrameworkInterface) initializedInputs.get(srcVertexName)).close();
-
+        ((InputFrameworkInterface)entry.getValue()).close();
         maybeResetInterruptStatus();
       } catch (InterruptedException ie) {
         //reset the status
@@ -828,7 +829,9 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
             srcVertexName);
         Thread.currentThread().interrupt();
       } catch (Throwable e) {
-        LOG.warn("Exception when closing input in cleanup(interrupted)", e);
+        LOG.warn(
+            "Ignoring exception when closing input {}(cleanup). Exception class={}, message={}",
+            srcVertexName, e.getClass().getName(), e.getMessage(), e);
       } finally {
         LOG.info("Close input for vertex={}, sourceVertex={}, interruptedStatus={}", processor
             .getContext().getTaskVertexName(), srcVertexName, Thread.currentThread()
@@ -837,32 +840,28 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     }
 
     // Close the remaining inited Outputs.
-    try {
-      Iterator<String> outVertexItr = initializedOutputs.keySet().iterator();
-      while (outVertexItr.hasNext()) {
-        String destVertexName = outVertexItr.next();
-        try {
-          outVertexItr.remove();
-
-          initializedOutputs.remove(destVertexName);
-          ((OutputFrameworkInterface) initializedOutputs.get(destVertexName)).close();
-
-          maybeResetInterruptStatus();
-        } catch (InterruptedException ie) {
-          //reset the status
-          LOG.info("Resetting interrupt status for output with destVertexName={}",
-              destVertexName);
-          Thread.currentThread().interrupt();
-        } catch (Throwable e) {
-          LOG.warn("Exception when closing output in cleanup(interrupted)", e);
-        } finally {
-          LOG.info("Close input for vertex={}, sourceVertex={}, interruptedStatus={}", processor
-              .getContext().getTaskVertexName(), destVertexName, Thread.currentThread()
-              .isInterrupted());
-        }
+    Iterator<Map.Entry<String, LogicalOutput>> outputIterator = initializedOutputs.entrySet().iterator();
+    while (outputIterator.hasNext()) {
+      Map.Entry<String, LogicalOutput> entry = outputIterator.next();
+      String destVertexName = entry.getKey();
+      outputIterator.remove();
+      try {
+        ((OutputFrameworkInterface) entry.getValue()).close();
+        maybeResetInterruptStatus();
+      } catch (InterruptedException ie) {
+        //reset the status
+        LOG.info("Resetting interrupt status for output with destVertexName={}",
+            destVertexName);
+        Thread.currentThread().interrupt();
+      } catch (Throwable e) {
+        LOG.warn(
+            "Ignoring exception when closing output {}(cleanup). Exception class={}, message={}",
+            destVertexName, e.getClass().getName(), e.getMessage(), e);
+      } finally {
+        LOG.info("Close input for vertex={}, sourceVertex={}, interruptedStatus={}", processor
+            .getContext().getTaskVertexName(), destVertexName, Thread.currentThread()
+            .isInterrupted());
       }
-    } catch (Throwable e) {
-      LOG.warn(Throwables.getStackTraceAsString(e));
     }
 
     if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/tez/blob/5f965323/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
index 7b09455..316a138 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
@@ -157,5 +157,5 @@ public abstract class RuntimeTask {
     taskDone.set(true);
   }
 
-  public abstract void abortTask() throws Exception;
+  public abstract void abortTask();
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/5f965323/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java
index 7315bbd..ab77635 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java
@@ -63,26 +63,26 @@ public class TaskRunner2Callable extends CallableWithNdc<TaskRunner2Callable.Tas
           if (stopRequested.get() || Thread.currentThread().isInterrupted()) {
             return new TaskRunner2CallableResult(null);
           }
-          LOG.info("Initializing task" + ", taskAttemptId=" + task.getTaskAttemptID());
+          LOG.info("Initializing task" + ", taskAttemptId={}", task.getTaskAttemptID());
           task.initialize();
 
           if (!stopRequested.get() && !Thread.currentThread().isInterrupted()) {
-            LOG.info("Running task, taskAttemptId=" + task.getTaskAttemptID());
+            LOG.info("Running task, taskAttemptId={}", task.getTaskAttemptID());
             task.run();
           } else {
-            LOG.info("Stopped before running the processor.");
+            LOG.info("Stopped before running the processor taskAttemptId={}", task.getTaskAttemptID());
             return new TaskRunner2CallableResult(null);
           }
 
           if (!stopRequested.get() && !Thread.currentThread().isInterrupted()) {
-            LOG.info("Closing task, taskAttemptId=" + task.getTaskAttemptID());
+            LOG.info("Closing task, taskAttemptId={}", task.getTaskAttemptID());
             task.close();
             task.setFrameworkCounters();
           } else {
-            LOG.info("Stopped before closing the processor");
+            LOG.info("Stopped before closing the processor, taskAttemptId={}", task.getTaskAttemptID());
             return new TaskRunner2CallableResult(null);
           }
-          LOG.info("Task completed, taskAttemptId=" + task.getTaskAttemptID() + ", askedToStop=" + stopRequested.get());
+          LOG.info("Task completed, taskAttemptId={}, askedToStop={}", task.getTaskAttemptID(), stopRequested.get());
 
 
           return new TaskRunner2CallableResult(null);
@@ -115,6 +115,7 @@ public class TaskRunner2Callable extends CallableWithNdc<TaskRunner2Callable.Tas
   public void interruptTask() {
     // Ensure the task is only interrupted once.
     if (!stopRequested.getAndSet(true)) {
+      task.abortTask();
       if (ownThread != null) {
         ownThread.interrupt();
       }

http://git-wip-us.apache.org/repos/asf/tez/blob/5f965323/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
index 73e5c76..ffbc6e8 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
@@ -250,10 +250,12 @@ public class TezTaskRunner2 {
   public void killTask() {
     synchronized (this) {
       if (isRunningState()) {
-        trySettingEndReason(EndReason.KILL_REQUESTED);
-        if (taskRunnerCallable != null) {
-          taskKillStartTime = System.currentTimeMillis();
-          taskRunnerCallable.interruptTask();
+        if (trySettingEndReason(EndReason.KILL_REQUESTED)) {
+          killTaskRequested.set(true);
+          if (taskRunnerCallable != null) {
+            taskKillStartTime = System.currentTimeMillis();
+            taskRunnerCallable.interruptTask();
+          }
         }
       }
     }