You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/09 03:43:20 UTC
[42/43] tez git commit: TEZ-2414. LogicalIOProcessorRuntimeTask,
RuntimeTask,
TezTaskRunner should handle interrupts & carry out necessary cleanups.
Contribtued by Rajesh Balamohan.
TEZ-2414. LogicalIOProcessorRuntimeTask, RuntimeTask, TezTaskRunner should handle interrupts & carry out necessary cleanups. Contribtued by Rajesh Balamohan.
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ea972acd
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ea972acd
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ea972acd
Branch: refs/heads/TEZ-2003
Commit: ea972acdc2a48c763596d34acb53577279eb00f0
Parents: fdb9177
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed May 6 00:39:46 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri May 8 14:46:10 2015 -0700
----------------------------------------------------------------------
TEZ-2003-CHANGES.txt | 1 +
.../runtime/LogicalIOProcessorRuntimeTask.java | 141 +++++++++++++++++++
.../org/apache/tez/runtime/RuntimeTask.java | 5 +
.../apache/tez/runtime/task/TezTaskRunner.java | 71 +++++++++-
4 files changed, 216 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/ea972acd/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index 9fc9ed3..f8a71e8 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -20,5 +20,6 @@ ALL CHANGES:
TEZ-2361. Propagate dag completion to TaskCommunicator.
TEZ-2381. Fixes after rebase 04/28.
TEZ-2388. Send dag identifier as part of the fetcher request string.
+ TEZ-2414. LogicalIOProcessorRuntimeTask, RuntimeTask, TezTaskRunner should handle interrupts & carry out necessary cleanups.
INCOMPATIBLE CHANGES:
http://git-wip-us.apache.org/repos/asf/tez/blob/ea972acd/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index 1cfe538..48c972c 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -20,10 +20,14 @@ package org.apache.tez.runtime;
import java.io.Closeable;
import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -38,6 +42,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
+import com.google.common.base.Throwables;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.tez.runtime.api.TaskContext;
import org.apache.tez.runtime.api.impl.TezProcessorContextImpl;
@@ -111,6 +116,10 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
final List<GroupInputSpec> groupInputSpecs;
ConcurrentHashMap<String, MergedLogicalInput> groupInputsMap;
+ final ConcurrentHashMap<String, LogicalInput> initializedInputs;
+ final ConcurrentHashMap<String, LogicalOutput> initializedOutputs;
+
+ private boolean processorClosed;
final ProcessorDescriptor processorDescriptor;
AbstractLogicalIOProcessor processor;
ProcessorContext processorContext;
@@ -163,6 +172,9 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
this.runInputMap = new LinkedHashMap<String, LogicalInput>();
this.runOutputMap = new LinkedHashMap<String, LogicalOutput>();
+ this.initializedInputs = new ConcurrentHashMap<String, LogicalInput>();
+ this.initializedOutputs = new ConcurrentHashMap<String, LogicalOutput>();
+
this.processorDescriptor = taskSpec.getProcessorDescriptor();
this.serviceConsumerMetadata = serviceConsumerMetadata;
this.envMap = envMap;
@@ -341,11 +353,13 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
this.state.set(State.CLOSED);
// Close the Processor.
+ processorClosed = true;
processor.close();
// Close the Inputs.
for (InputSpec inputSpec : inputSpecs) {
String srcVertexName = inputSpec.getSourceVertexName();
+ initializedInputs.remove(srcVertexName);
List<Event> closeInputEvents = ((InputFrameworkInterface)inputsMap.get(srcVertexName)).close();
sendTaskGeneratedEvents(closeInputEvents,
EventProducerConsumerType.INPUT, taskSpec.getVertexName(),
@@ -355,6 +369,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
// Close the Outputs.
for (OutputSpec outputSpec : outputSpecs) {
String destVertexName = outputSpec.getDestinationVertexName();
+ initializedOutputs.remove(destVertexName);
List<Event> closeOutputEvents = ((LogicalOutputFrameworkInterface)outputsMap.get(destVertexName)).close();
sendTaskGeneratedEvents(closeOutputEvents,
EventProducerConsumerType.OUTPUT, taskSpec.getVertexName(),
@@ -402,6 +417,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
inputContext.getTaskVertexName(), inputContext.getSourceVertexName(),
taskSpec.getTaskAttemptID());
LOG.info("Initialized Input with src edge: " + edgeName);
+ initializedInputs.put(edgeName, input);
return null;
}
}
@@ -450,6 +466,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
outputContext.getTaskVertexName(),
outputContext.getDestinationVertexName(), taskSpec.getTaskAttemptID());
LOG.info("Initialized Output with dest edge: " + edgeName);
+ initializedOutputs.put(edgeName, output);
return null;
}
}
@@ -675,6 +692,13 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
eventsToBeProcessed.addAll(events);
}
+ @Override
+ public synchronized void abortTask() throws Exception {
+ if (processor != null) {
+ processor.abort();
+ }
+ }
+
private void startRouterThread() {
eventRouterThread = new Thread(new RunnableWithNdc() {
public void runInternal() {
@@ -694,6 +718,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
if (!isTaskDone()) {
LOG.warn("Event Router thread interrupted. Returning.");
}
+ Thread.currentThread().interrupt();
return;
}
}
@@ -705,6 +730,12 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
eventRouterThread.start();
}
+ private void maybeResetInterruptStatus() {
+ if (!Thread.currentThread().isInterrupted()) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
private void closeContexts() throws IOException {
closeContext(inputContextMap);
closeContext(outputContextMap);
@@ -742,6 +773,102 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
}
eventRouterThread = null;
}
+
+ /**
+ * Cleanup IPO that are not closed. In case, regular close() has happened in IPO, they
+ * would not be available in the IPOs to be cleaned. So this is safe.
+ *
+ * e.g whenever input gets closed() in normal way, it automatically removes it from
+ * initializedInputs map.
+ *
+ * In case any exception happens in processor close or IO close, it wouldn't be removed from
+ * the initialized IO data structures and here is the chance to close them and release
+ * resources.
+ *
+ */
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processor closed={}", processorClosed);
+ LOG.debug("Num of inputs to be closed={}", initializedInputs.size());
+ LOG.debug("Num of outputs to be closed={}", initializedOutputs.size());
+ }
+ if (!processorClosed) {
+ try {
+ processorClosed = true;
+ processor.close();
+
+ LOG.info("Closed processor for vertex={}, index={}, interruptedStatus={}",
+ processor
+ .getContext().getTaskVertexName(),
+ processor.getContext().getTaskVertexIndex(),
+ Thread.currentThread().isInterrupted());
+
+ maybeResetInterruptStatus();
+ } catch (InterruptedException ie) {
+ //reset the status
+ LOG.info("Resetting interrupt for processor");
+ Thread.currentThread().interrupt();
+ } catch (Throwable e) {
+ LOG.warn("Exception when closing processor", e);
+ }
+ }
+ // Close the remaining inited Inputs.
+ Iterator<String> srcVertexItr = initializedInputs.keySet().iterator();
+ while (srcVertexItr.hasNext()) {
+ String srcVertexName = srcVertexItr.next();
+ try {
+ srcVertexItr.remove();
+
+ ((InputFrameworkInterface) initializedInputs.get(srcVertexName)).close();
+ initializedInputs.remove(srcVertexName);
+
+ maybeResetInterruptStatus();
+ } catch (InterruptedException ie) {
+ //reset the status
+ LOG.info("Resetting interrupt status for input with srcVertexName={}",
+ srcVertexName);
+ Thread.currentThread().interrupt();
+ } catch (Throwable e) {
+ LOG.warn("Exception when closing input in cleanup(interrupted)", e);
+ } finally {
+ LOG.info("Close input for vertex={}, sourceVertex={}, interruptedStatus={}", processor
+ .getContext().getTaskVertexName(), srcVertexName, Thread.currentThread()
+ .isInterrupted());
+ }
+ }
+
+ // Close the remaining inited Outputs.
+ try {
+ Iterator<String> outVertexItr = initializedOutputs.keySet().iterator();
+ while (outVertexItr.hasNext()) {
+ String destVertexName = outVertexItr.next();
+ try {
+ outVertexItr.remove();
+
+ ((OutputFrameworkInterface) initializedOutputs.get(destVertexName)).close();
+ initializedOutputs.remove(destVertexName);
+
+ maybeResetInterruptStatus();
+ } catch (InterruptedException ie) {
+ //reset the status
+ LOG.info("Resetting interrupt status for output with destVertexName={}",
+ destVertexName);
+ Thread.currentThread().interrupt();
+ } catch (Throwable e) {
+ LOG.warn("Exception when closing output in cleanup(interrupted)", e);
+ } finally {
+ LOG.info("Close input for vertex={}, sourceVertex={}, interruptedStatus={}", processor
+ .getContext().getTaskVertexName(), destVertexName, Thread.currentThread()
+ .isInterrupted());
+ }
+ }
+ } catch (Throwable e) {
+ LOG.warn(Throwables.getStackTraceAsString(e));
+ }
+
+ if (LOG.isDebugEnabled()) {
+ printThreads();
+ }
+
try {
closeContexts();
// Cleanup references which may be held by misbehaved tasks.
@@ -785,6 +912,20 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
inputReadyTracker = null;
objectRegistry = null;
}
+
+
+ /**
+ * Print all threads in JVM (only for debugging)
+ */
+ void printThreads() {
+ //Print the status of all threads in JVM
+ ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
+ long[] threadIds = threadMXBean.getAllThreadIds();
+ for (Long id : threadIds) {
+ ThreadInfo threadInfo = threadMXBean.getThreadInfo(id);
+ LOG.info("ThreadId : " + id + ", name=" + threadInfo.getThreadName());
+ }
+ }
@Private
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/tez/blob/ea972acd/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
index 921095c..7b09455 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
@@ -74,6 +74,10 @@ public abstract class RuntimeTask {
protected final AtomicReference<State> state = new AtomicReference<State>();
+ public boolean isRunning() {
+ return (state.get() == State.RUNNING);
+ }
+
public TezCounters addAndGetTezCounter(String name) {
TezCounters counter = new TezCounters();
counterMap.put(name, counter);
@@ -153,4 +157,5 @@ public abstract class RuntimeTask {
taskDone.set(true);
}
+ public abstract void abortTask() throws Exception;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/ea972acd/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
index 33a7f4a..7238d5e 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
@@ -25,8 +25,13 @@ import java.security.PrivilegedExceptionAction;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import com.google.common.base.Throwables;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSError;
@@ -35,6 +40,7 @@ import org.apache.tez.common.CallableWithNdc;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
+import org.apache.tez.runtime.RuntimeTask;
import org.apache.tez.runtime.api.ExecutionContext;
import org.apache.tez.runtime.api.ObjectRegistry;
import org.apache.tez.runtime.api.impl.EventMetaData;
@@ -61,6 +67,7 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter {
private final ListeningExecutorService executor;
private volatile ListenableFuture<Void> taskFuture;
private volatile Thread waitingThread;
+ private volatile Thread taskRunner;
private volatile Throwable firstException;
// Effectively a duplicate check, since hadFatalError does the same thing.
@@ -96,7 +103,10 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter {
taskReporter.registerTask(task, this);
TaskRunnerCallable callable = new TaskRunnerCallable();
Throwable failureCause = null;
- taskFuture = executor.submit(callable);
+ if (!Thread.currentThread().isInterrupted()) {
+ taskFuture = executor.submit(callable);
+ return isShutdownRequested();
+ }
try {
taskFuture.get();
@@ -158,6 +168,10 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter {
}
}
}
+ return isShutdownRequested();
+ }
+
+ private boolean isShutdownRequested() {
if (shutdownRequested.get()) {
LOG.info("Shutdown requested... returning");
return false;
@@ -173,11 +187,14 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter {
@Override
public Void run() throws Exception {
try {
+ taskRunner = Thread.currentThread();
LOG.info("Initializing task" + ", taskAttemptId=" + task.getTaskAttemptID());
task.initialize();
if (!Thread.currentThread().isInterrupted() && firstException == null) {
LOG.info("Running task, taskAttemptId=" + task.getTaskAttemptID());
task.run();
+ maybeInterruptWaitingThread();
+
LOG.info("Closing task, taskAttemptId=" + task.getTaskAttemptID());
task.close();
task.setFrameworkCounters();
@@ -199,6 +216,12 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter {
}
return null;
} catch (Throwable cause) {
+ if (Thread.currentThread().isInterrupted()) {
+ LOG.info("TaskRunnerCallable interrupted=" + Thread.currentThread().isInterrupted()
+ + ", shutdownRequest=" + shutdownRequested.get());
+ Thread.currentThread().interrupt();
+ return null;
+ }
if (cause instanceof FSError) {
// Not immediately fatal, this is an error reported by Hadoop FileSystem
maybeRegisterFirstException(cause);
@@ -255,6 +278,17 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter {
taskRunning.set(false);
}
}
+
+ private void maybeInterruptWaitingThread() {
+ /**
+ * Possible that the processor is swallowing InterruptException of taskRunner.interrupt().
+ * In such case, interrupt the waitingThread based on the shutdownRequested flag, so that
+ * entire task gets cancelled.
+ */
+ if (shutdownRequested.get()) {
+ waitingThread.interrupt();
+ }
+ }
}
// should wait until all messages are sent to AM before TezChild shutdown
@@ -353,10 +387,43 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter {
}
}
+ private void abortRunningTask() {
+ if (!taskRunning.get()) {
+ LOG.info("Task is not running");
+ waitingThread.interrupt();
+ return;
+ }
+
+ if (taskRunning.get()) {
+ try {
+ task.abortTask();
+ } catch (Exception e) {
+ LOG.warn("Error when aborting the task", e);
+ try {
+ sendFailure(e, "Error when aborting the task");
+ } catch (Exception ignored) {
+ // Ignored.
+ }
+ }
+ }
+ //Interrupt the relevant threads. TaskRunner should be interrupted preferably.
+ if (isTaskRunning()) {
+ LOG.info("Interrupting taskRunner=" + taskRunner.getName());
+ taskRunner.interrupt();
+ } else {
+ LOG.info("Interrupting waitingThread=" + waitingThread.getName());
+ waitingThread.interrupt();
+ }
+ }
+
+ private boolean isTaskRunning() {
+ return (taskRunning.get() && task.isRunning());
+ }
+
@Override
public void shutdownRequested() {
shutdownRequested.set(true);
- waitingThread.interrupt();
+ abortRunningTask();
}
private String getTaskDiagnosticsString(Throwable t, String message) {