You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/12 23:28:04 UTC
tez git commit: TEZ-2443. TaskRunner2 should call abort,
NPEs while cleaning up tasks. (sseth)
Repository: tez
Updated Branches:
refs/heads/TEZ-2003 b787648ed -> 5f9653233
TEZ-2443. TaskRunner2 should call abort, NPEs while cleaning up tasks. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/5f965323
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/5f965323
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/5f965323
Branch: refs/heads/TEZ-2003
Commit: 5f96532332eda1246412c3e0af3661aa9179ce96
Parents: b787648
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue May 12 14:27:42 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Tue May 12 14:27:42 2015 -0700
----------------------------------------------------------------------
TEZ-2003-CHANGES.txt | 1 +
.../apache/tez/dag/api/TaskCommunicator.java | 4 ++
.../runtime/LogicalIOProcessorRuntimeTask.java | 75 ++++++++++----------
.../org/apache/tez/runtime/RuntimeTask.java | 2 +-
.../tez/runtime/task/TaskRunner2Callable.java | 13 ++--
.../apache/tez/runtime/task/TezTaskRunner2.java | 10 +--
6 files changed, 56 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/5f965323/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index 5d2e40a..ed72d6b 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -25,5 +25,6 @@ ALL CHANGES:
TEZ-2433. Fixes after rebase 05/08
TEZ-2438. tez-tools version in the branch is incorrect.
TEZ-2434. Allow tasks to be killed in the Runtime.
+ TEZ-2443. TaskRunner2 should call abort, NPEs while cleaning up tasks.
INCOMPATIBLE CHANGES:
http://git-wip-us.apache.org/repos/asf/tez/blob/5f965323/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
index cadca0c..2651013 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
@@ -48,6 +48,10 @@ public abstract class TaskCommunicator extends AbstractService {
// TODO TEZ-2003. Are additional APIs required to mark a container as completed ? - for completeness.
// TODO TEZ-2003 Remove reference to TaskAttemptID
+ // TODO TEZ-2003 This needs some information about why the attempt is being unregistered.
+ // e.g. preempted in which case the task may need to be informed. Alternately as a result of
+ // a failed task.
+ // In case of preemption - a killTask API is likely a better bet than trying to overload this method.
public abstract void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID);
// TODO TEZ-2003 This doesn't necessarily belong here. A server may not start within the AM.
http://git-wip-us.apache.org/repos/asf/tez/blob/5f965323/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index 9a0e397..f32da76 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -119,7 +119,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
final ConcurrentHashMap<String, LogicalInput> initializedInputs;
final ConcurrentHashMap<String, LogicalOutput> initializedOutputs;
- private boolean processorClosed;
+ private boolean processorClosed = false;
final ProcessorDescriptor processorDescriptor;
AbstractLogicalIOProcessor processor;
ProcessorContext processorContext;
@@ -693,7 +693,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
}
@Override
- public synchronized void abortTask() throws Exception {
+ public synchronized void abortTask() {
if (processor != null) {
processor.abort();
}
@@ -791,7 +791,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
LOG.debug("Num of inputs to be closed={}", initializedInputs.size());
LOG.debug("Num of outputs to be closed={}", initializedOutputs.size());
}
- if (!processorClosed) {
+ if (!processorClosed && processor != null) {
try {
processorClosed = true;
processor.close();
@@ -808,19 +808,20 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
LOG.info("Resetting interrupt for processor");
Thread.currentThread().interrupt();
} catch (Throwable e) {
- LOG.warn("Exception when closing processor", e);
+ LOG.warn(
+ "Ignoring Exception when closing processor(cleanup). Exception class={}, message={}" +
+ e.getClass().getName(), e.getMessage(), e);
}
}
+
// Close the remaining inited Inputs.
- Iterator<String> srcVertexItr = initializedInputs.keySet().iterator();
- while (srcVertexItr.hasNext()) {
- String srcVertexName = srcVertexItr.next();
+ Iterator<Map.Entry<String, LogicalInput>> inputIterator = initializedInputs.entrySet().iterator();
+ while (inputIterator.hasNext()) {
+ Map.Entry<String, LogicalInput> entry = inputIterator.next();
+ String srcVertexName = entry.getKey();
+ inputIterator.remove();
try {
- srcVertexItr.remove();
-
- initializedInputs.remove(srcVertexName);
- ((InputFrameworkInterface) initializedInputs.get(srcVertexName)).close();
-
+ ((InputFrameworkInterface)entry.getValue()).close();
maybeResetInterruptStatus();
} catch (InterruptedException ie) {
//reset the status
@@ -828,7 +829,9 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
srcVertexName);
Thread.currentThread().interrupt();
} catch (Throwable e) {
- LOG.warn("Exception when closing input in cleanup(interrupted)", e);
+ LOG.warn(
+ "Ignoring exception when closing input {}(cleanup). Exception class={}, message={}",
+ srcVertexName, e.getClass().getName(), e.getMessage(), e);
} finally {
LOG.info("Close input for vertex={}, sourceVertex={}, interruptedStatus={}", processor
.getContext().getTaskVertexName(), srcVertexName, Thread.currentThread()
@@ -837,32 +840,28 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
}
// Close the remaining inited Outputs.
- try {
- Iterator<String> outVertexItr = initializedOutputs.keySet().iterator();
- while (outVertexItr.hasNext()) {
- String destVertexName = outVertexItr.next();
- try {
- outVertexItr.remove();
-
- initializedOutputs.remove(destVertexName);
- ((OutputFrameworkInterface) initializedOutputs.get(destVertexName)).close();
-
- maybeResetInterruptStatus();
- } catch (InterruptedException ie) {
- //reset the status
- LOG.info("Resetting interrupt status for output with destVertexName={}",
- destVertexName);
- Thread.currentThread().interrupt();
- } catch (Throwable e) {
- LOG.warn("Exception when closing output in cleanup(interrupted)", e);
- } finally {
- LOG.info("Close input for vertex={}, sourceVertex={}, interruptedStatus={}", processor
- .getContext().getTaskVertexName(), destVertexName, Thread.currentThread()
- .isInterrupted());
- }
+ Iterator<Map.Entry<String, LogicalOutput>> outputIterator = initializedOutputs.entrySet().iterator();
+ while (outputIterator.hasNext()) {
+ Map.Entry<String, LogicalOutput> entry = outputIterator.next();
+ String destVertexName = entry.getKey();
+ outputIterator.remove();
+ try {
+ ((OutputFrameworkInterface) entry.getValue()).close();
+ maybeResetInterruptStatus();
+ } catch (InterruptedException ie) {
+ //reset the status
+ LOG.info("Resetting interrupt status for output with destVertexName={}",
+ destVertexName);
+ Thread.currentThread().interrupt();
+ } catch (Throwable e) {
+ LOG.warn(
+ "Ignoring exception when closing output {}(cleanup). Exception class={}, message={}",
+ destVertexName, e.getClass().getName(), e.getMessage(), e);
+ } finally {
+ LOG.info("Close input for vertex={}, sourceVertex={}, interruptedStatus={}", processor
+ .getContext().getTaskVertexName(), destVertexName, Thread.currentThread()
+ .isInterrupted());
}
- } catch (Throwable e) {
- LOG.warn(Throwables.getStackTraceAsString(e));
}
if (LOG.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/tez/blob/5f965323/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
index 7b09455..316a138 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
@@ -157,5 +157,5 @@ public abstract class RuntimeTask {
taskDone.set(true);
}
- public abstract void abortTask() throws Exception;
+ public abstract void abortTask();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/5f965323/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java
index 7315bbd..ab77635 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java
@@ -63,26 +63,26 @@ public class TaskRunner2Callable extends CallableWithNdc<TaskRunner2Callable.Tas
if (stopRequested.get() || Thread.currentThread().isInterrupted()) {
return new TaskRunner2CallableResult(null);
}
- LOG.info("Initializing task" + ", taskAttemptId=" + task.getTaskAttemptID());
+ LOG.info("Initializing task" + ", taskAttemptId={}", task.getTaskAttemptID());
task.initialize();
if (!stopRequested.get() && !Thread.currentThread().isInterrupted()) {
- LOG.info("Running task, taskAttemptId=" + task.getTaskAttemptID());
+ LOG.info("Running task, taskAttemptId={}", task.getTaskAttemptID());
task.run();
} else {
- LOG.info("Stopped before running the processor.");
+ LOG.info("Stopped before running the processor taskAttemptId={}", task.getTaskAttemptID());
return new TaskRunner2CallableResult(null);
}
if (!stopRequested.get() && !Thread.currentThread().isInterrupted()) {
- LOG.info("Closing task, taskAttemptId=" + task.getTaskAttemptID());
+ LOG.info("Closing task, taskAttemptId={}", task.getTaskAttemptID());
task.close();
task.setFrameworkCounters();
} else {
- LOG.info("Stopped before closing the processor");
+ LOG.info("Stopped before closing the processor, taskAttemptId={}", task.getTaskAttemptID());
return new TaskRunner2CallableResult(null);
}
- LOG.info("Task completed, taskAttemptId=" + task.getTaskAttemptID() + ", askedToStop=" + stopRequested.get());
+ LOG.info("Task completed, taskAttemptId={}, askedToStop={}", task.getTaskAttemptID(), stopRequested.get());
return new TaskRunner2CallableResult(null);
@@ -115,6 +115,7 @@ public class TaskRunner2Callable extends CallableWithNdc<TaskRunner2Callable.Tas
public void interruptTask() {
// Ensure the task is only interrupted once.
if (!stopRequested.getAndSet(true)) {
+ task.abortTask();
if (ownThread != null) {
ownThread.interrupt();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/5f965323/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
index 73e5c76..ffbc6e8 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
@@ -250,10 +250,12 @@ public class TezTaskRunner2 {
public void killTask() {
synchronized (this) {
if (isRunningState()) {
- trySettingEndReason(EndReason.KILL_REQUESTED);
- if (taskRunnerCallable != null) {
- taskKillStartTime = System.currentTimeMillis();
- taskRunnerCallable.interruptTask();
+ if (trySettingEndReason(EndReason.KILL_REQUESTED)) {
+ killTaskRequested.set(true);
+ if (taskRunnerCallable != null) {
+ taskKillStartTime = System.currentTimeMillis();
+ taskRunnerCallable.interruptTask();
+ }
}
}
}