You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/22 09:26:13 UTC
[14/50] [abbrv] tez git commit: TEZ-2443. TaskRunner2 should call
abort, NPEs while cleaning up tasks. (sseth)
TEZ-2443. TaskRunner2 should call abort, NPEs while cleaning up tasks. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3d795d62
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3d795d62
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3d795d62
Branch: refs/heads/master
Commit: 3d795d62c0d471ff866adb9002e8c6fe6ea9536a
Parents: 7cf416a
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue May 12 14:27:42 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Aug 21 18:13:55 2015 -0700
----------------------------------------------------------------------
TEZ-2003-CHANGES.txt | 1 +
.../java/org/apache/tez/dag/api/TaskCommunicator.java | 4 ++++
.../tez/runtime/LogicalIOProcessorRuntimeTask.java | 11 ++++++-----
.../main/java/org/apache/tez/runtime/RuntimeTask.java | 2 +-
.../apache/tez/runtime/task/TaskRunner2Callable.java | 13 +++++++------
.../org/apache/tez/runtime/task/TezTaskRunner2.java | 10 ++++++----
6 files changed, 25 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/3d795d62/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index 5d2e40a..ed72d6b 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -25,5 +25,6 @@ ALL CHANGES:
TEZ-2433. Fixes after rebase 05/08
TEZ-2438. tez-tools version in the branch is incorrect.
TEZ-2434. Allow tasks to be killed in the Runtime.
+ TEZ-2443. TaskRunner2 should call abort, NPEs while cleaning up tasks.
INCOMPATIBLE CHANGES:
http://git-wip-us.apache.org/repos/asf/tez/blob/3d795d62/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
index cadca0c..2651013 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
@@ -48,6 +48,10 @@ public abstract class TaskCommunicator extends AbstractService {
// TODO TEZ-2003. Are additional APIs required to mark a container as completed ? - for completeness.
// TODO TEZ-2003 Remove reference to TaskAttemptID
+ // TODO TEZ-2003 This needs some information about why the attempt is being unregistered.
+ // e.g. preempted in which case the task may need to be informed. Alternately as a result of
+ // a failed task.
+ // In case of preemption - a killTask API is likely a better bet than trying to overload this method.
public abstract void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID);
// TODO TEZ-2003 This doesn't necessarily belong here. A server may not start within the AM.
http://git-wip-us.apache.org/repos/asf/tez/blob/3d795d62/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index 8263b3f..de08e56 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -704,7 +704,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
}
@Override
- public synchronized void abortTask() throws Exception {
+ public synchronized void abortTask() {
if (processor != null) {
processor.abort();
}
@@ -803,6 +803,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
LOG.debug("Num of inputs to be closed={}", initializedInputs.size());
LOG.debug("Num of outputs to be closed={}", initializedOutputs.size());
}
+
// Close processor
if (!processorClosed && processor != null) {
try {
@@ -820,8 +821,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
Thread.currentThread().interrupt();
} catch (Throwable e) {
LOG.warn(
- "Ignoring Exception when closing processor(cleanup). Exception class={}, message={}",
- e.getClass().getName(), e.getMessage());
+ "Ignoring Exception when closing processor(cleanup). Exception class={}, message={}" +
+ e.getClass().getName(), e.getMessage(), e);
}
}
@@ -842,7 +843,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
} catch (Throwable e) {
LOG.warn(
"Ignoring exception when closing input {}(cleanup). Exception class={}, message={}",
- srcVertexName, e.getClass().getName(), e.getMessage());
+ srcVertexName, e.getClass().getName(), e.getMessage(), e);
} finally {
LOG.info("Close input for vertex={}, sourceVertex={}, interruptedStatus={}", processor
.getContext().getTaskVertexName(), srcVertexName, Thread.currentThread().isInterrupted());
@@ -866,7 +867,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
} catch (Throwable e) {
LOG.warn(
"Ignoring exception when closing output {}(cleanup). Exception class={}, message={}",
- destVertexName, e.getClass().getName(), e.getMessage());
+ destVertexName, e.getClass().getName(), e.getMessage(), e);
} finally {
LOG.info("Close input for vertex={}, sourceVertex={}, interruptedStatus={}", processor
.getContext().getTaskVertexName(), destVertexName, Thread.currentThread().isInterrupted());
http://git-wip-us.apache.org/repos/asf/tez/blob/3d795d62/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
index cdfb46a..33c0113 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
@@ -167,5 +167,5 @@ public abstract class RuntimeTask {
taskDone.set(true);
}
- public abstract void abortTask() throws Exception;
+ public abstract void abortTask();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/3d795d62/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java
index 7315bbd..ab77635 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java
@@ -63,26 +63,26 @@ public class TaskRunner2Callable extends CallableWithNdc<TaskRunner2Callable.Tas
if (stopRequested.get() || Thread.currentThread().isInterrupted()) {
return new TaskRunner2CallableResult(null);
}
- LOG.info("Initializing task" + ", taskAttemptId=" + task.getTaskAttemptID());
+ LOG.info("Initializing task" + ", taskAttemptId={}", task.getTaskAttemptID());
task.initialize();
if (!stopRequested.get() && !Thread.currentThread().isInterrupted()) {
- LOG.info("Running task, taskAttemptId=" + task.getTaskAttemptID());
+ LOG.info("Running task, taskAttemptId={}", task.getTaskAttemptID());
task.run();
} else {
- LOG.info("Stopped before running the processor.");
+ LOG.info("Stopped before running the processor taskAttemptId={}", task.getTaskAttemptID());
return new TaskRunner2CallableResult(null);
}
if (!stopRequested.get() && !Thread.currentThread().isInterrupted()) {
- LOG.info("Closing task, taskAttemptId=" + task.getTaskAttemptID());
+ LOG.info("Closing task, taskAttemptId={}", task.getTaskAttemptID());
task.close();
task.setFrameworkCounters();
} else {
- LOG.info("Stopped before closing the processor");
+ LOG.info("Stopped before closing the processor, taskAttemptId={}", task.getTaskAttemptID());
return new TaskRunner2CallableResult(null);
}
- LOG.info("Task completed, taskAttemptId=" + task.getTaskAttemptID() + ", askedToStop=" + stopRequested.get());
+ LOG.info("Task completed, taskAttemptId={}, askedToStop={}", task.getTaskAttemptID(), stopRequested.get());
return new TaskRunner2CallableResult(null);
@@ -115,6 +115,7 @@ public class TaskRunner2Callable extends CallableWithNdc<TaskRunner2Callable.Tas
public void interruptTask() {
// Ensure the task is only interrupted once.
if (!stopRequested.getAndSet(true)) {
+ task.abortTask();
if (ownThread != null) {
ownThread.interrupt();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/3d795d62/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
index 73e5c76..ffbc6e8 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
@@ -250,10 +250,12 @@ public class TezTaskRunner2 {
public void killTask() {
synchronized (this) {
if (isRunningState()) {
- trySettingEndReason(EndReason.KILL_REQUESTED);
- if (taskRunnerCallable != null) {
- taskKillStartTime = System.currentTimeMillis();
- taskRunnerCallable.interruptTask();
+ if (trySettingEndReason(EndReason.KILL_REQUESTED)) {
+ killTaskRequested.set(true);
+ if (taskRunnerCallable != null) {
+ taskKillStartTime = System.currentTimeMillis();
+ taskRunnerCallable.interruptTask();
+ }
}
}
}