You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/14 22:58:24 UTC
[06/50] [abbrv] tez git commit: TEZ-2414.
LogicalIOProcessorRuntimeTask, RuntimeTask,
TezTaskRunner should handle interrupts & carry out necessary cleanups.
Contribtued by Rajesh Balamohan.
TEZ-2414. LogicalIOProcessorRuntimeTask, RuntimeTask, TezTaskRunner should handle interrupts & carry out necessary cleanups. Contribtued by Rajesh Balamohan.
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/2fc431d9
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/2fc431d9
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/2fc431d9
Branch: refs/heads/TEZ-2003
Commit: 2fc431d90919f3840fde1bf2786c21da9983bd01
Parents: 619aaf3
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed May 6 00:39:46 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Aug 14 13:46:43 2015 -0700
----------------------------------------------------------------------
TEZ-2003-CHANGES.txt | 1 +
.../runtime/LogicalIOProcessorRuntimeTask.java | 83 ++++++++++++++++++--
.../org/apache/tez/runtime/RuntimeTask.java | 5 ++
.../apache/tez/runtime/task/TezTaskRunner.java | 71 ++++++++++++++++-
4 files changed, 152 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/2fc431d9/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index 9fc9ed3..f8a71e8 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -20,5 +20,6 @@ ALL CHANGES:
TEZ-2361. Propagate dag completion to TaskCommunicator.
TEZ-2381. Fixes after rebase 04/28.
TEZ-2388. Send dag identifier as part of the fetcher request string.
+ TEZ-2414. LogicalIOProcessorRuntimeTask, RuntimeTask, TezTaskRunner should handle interrupts & carry out necessary cleanups.
INCOMPATIBLE CHANGES:
http://git-wip-us.apache.org/repos/asf/tez/blob/2fc431d9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index 84e5e0d..8263b3f 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -20,6 +20,9 @@ package org.apache.tez.runtime;
import java.io.Closeable;
import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
@@ -41,6 +44,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
+import com.google.common.base.Throwables;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.tez.runtime.api.TaskContext;
import org.apache.tez.runtime.api.impl.TezProcessorContextImpl;
@@ -174,6 +178,9 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
this.runInputMap = new LinkedHashMap<String, LogicalInput>();
this.runOutputMap = new LinkedHashMap<String, LogicalOutput>();
+ this.initializedInputs = new ConcurrentHashMap<String, LogicalInput>();
+ this.initializedOutputs = new ConcurrentHashMap<String, LogicalOutput>();
+
this.processorDescriptor = taskSpec.getProcessorDescriptor();
this.serviceConsumerMetadata = serviceConsumerMetadata;
this.envMap = envMap;
@@ -420,6 +427,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
taskSpec.getTaskAttemptID());
initializedInputs.put(edgeName, input);
LOG.info("Initialized Input with src edge: " + edgeName);
+ initializedInputs.put(edgeName, input);
return null;
}
}
@@ -469,6 +477,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
outputContext.getDestinationVertexName(), taskSpec.getTaskAttemptID());
initializedOutputs.put(edgeName, output);
LOG.info("Initialized Output with dest edge: " + edgeName);
+ initializedOutputs.put(edgeName, output);
return null;
}
}
@@ -694,6 +703,13 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
eventsToBeProcessed.addAll(events);
}
+ @Override
+ public synchronized void abortTask() throws Exception {
+ if (processor != null) {
+ processor.abort();
+ }
+ }
+
private void startRouterThread() {
eventRouterThread = new Thread(new RunnableWithNdc() {
public void runInternal() {
@@ -713,6 +729,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
if (!isTaskDone()) {
LOG.warn("Event Router thread interrupted. Returning.");
}
+ Thread.currentThread().interrupt();
return;
}
}
@@ -724,6 +741,12 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
eventRouterThread.start();
}
+ private void maybeResetInterruptStatus() {
+ if (!Thread.currentThread().isInterrupted()) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
private void closeContexts() throws IOException {
closeContext(inputContextMap);
closeContext(outputContextMap);
@@ -763,6 +786,18 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
}
// Close the unclosed IPO
+ /**
+ * Cleanup IPO that are not closed. In case, regular close() has happened in IPO, they
+ * would not be available in the IPOs to be cleaned. So this is safe.
+ *
+ * e.g whenever input gets closed() in normal way, it automatically removes it from
+ * initializedInputs map.
+ *
+ * In case any exception happens in processor close or IO close, it wouldn't be removed from
+ * the initialized IO data structures and here is the chance to close them and release
+ * resources.
+ *
+ */
if (LOG.isDebugEnabled()) {
LOG.debug("Processor closed={}", processorClosed);
LOG.debug("Num of inputs to be closed={}", initializedInputs.size());
@@ -773,10 +808,16 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
try {
processorClosed = true;
processor.close();
- LOG.info("Closed processor for vertex={}, index={}",
+ LOG.info("Closed processor for vertex={}, index={}, interruptedStatus={}",
processor
.getContext().getTaskVertexName(),
- processor.getContext().getTaskVertexIndex());
+ processor.getContext().getTaskVertexIndex(),
+ Thread.currentThread().isInterrupted());
+ maybeResetInterruptStatus();
+ } catch (InterruptedException ie) {
+ //reset the status
+ LOG.info("Resetting interrupt for processor");
+ Thread.currentThread().interrupt();
} catch (Throwable e) {
LOG.warn(
"Ignoring Exception when closing processor(cleanup). Exception class={}, message={}",
@@ -792,13 +833,19 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
inputIterator.remove();
try {
((InputFrameworkInterface)entry.getValue()).close();
+ maybeResetInterruptStatus();
+ } catch (InterruptedException ie) {
+ //reset the status
+ LOG.info("Resetting interrupt status for input with srcVertexName={}",
+ srcVertexName);
+ Thread.currentThread().interrupt();
} catch (Throwable e) {
LOG.warn(
"Ignoring exception when closing input {}(cleanup). Exception class={}, message={}",
srcVertexName, e.getClass().getName(), e.getMessage());
} finally {
- LOG.info("Close input for vertex={}, sourceVertex={}", processor
- .getContext().getTaskVertexName(), srcVertexName);
+ LOG.info("Close input for vertex={}, sourceVertex={}, interruptedStatus={}", processor
+ .getContext().getTaskVertexName(), srcVertexName, Thread.currentThread().isInterrupted());
}
}
@@ -810,16 +857,26 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
outputIterator.remove();
try {
((OutputFrameworkInterface) entry.getValue()).close();
+ maybeResetInterruptStatus();
+ } catch (InterruptedException ie) {
+ //reset the status
+ LOG.info("Resetting interrupt status for output with destVertexName={}",
+ destVertexName);
+ Thread.currentThread().interrupt();
} catch (Throwable e) {
LOG.warn(
"Ignoring exception when closing output {}(cleanup). Exception class={}, message={}",
destVertexName, e.getClass().getName(), e.getMessage());
} finally {
- LOG.info("Close input for vertex={}, sourceVertex={}", processor
- .getContext().getTaskVertexName(), destVertexName);
+ LOG.info("Close input for vertex={}, sourceVertex={}, interruptedStatus={}", processor
+ .getContext().getTaskVertexName(), destVertexName, Thread.currentThread().isInterrupted());
}
}
+ if (LOG.isDebugEnabled()) {
+ printThreads();
+ }
+
try {
closeContexts();
// Cleanup references which may be held by misbehaved tasks.
@@ -867,6 +924,20 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
inputReadyTracker = null;
objectRegistry = null;
}
+
+
+ /**
+ * Print all threads in JVM (only for debugging)
+ */
+ void printThreads() {
+ //Print the status of all threads in JVM
+ ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
+ long[] threadIds = threadMXBean.getAllThreadIds();
+ for (Long id : threadIds) {
+ ThreadInfo threadInfo = threadMXBean.getThreadInfo(id);
+ LOG.info("ThreadId : " + id + ", name=" + threadInfo.getThreadName());
+ }
+ }
@Private
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/tez/blob/2fc431d9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
index 17d7053..cdfb46a 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
@@ -76,6 +76,10 @@ public abstract class RuntimeTask {
protected final AtomicReference<State> state = new AtomicReference<State>();
+ public boolean isRunning() {
+ return (state.get() == State.RUNNING);
+ }
+
public TezCounters addAndGetTezCounter(String name) {
TezCounters counter = new TezCounters();
counterMap.put(name, counter);
@@ -163,4 +167,5 @@ public abstract class RuntimeTask {
taskDone.set(true);
}
+ public abstract void abortTask() throws Exception;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/2fc431d9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
index 33a7f4a..7238d5e 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
@@ -25,8 +25,13 @@ import java.security.PrivilegedExceptionAction;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import com.google.common.base.Throwables;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSError;
@@ -35,6 +40,7 @@ import org.apache.tez.common.CallableWithNdc;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
+import org.apache.tez.runtime.RuntimeTask;
import org.apache.tez.runtime.api.ExecutionContext;
import org.apache.tez.runtime.api.ObjectRegistry;
import org.apache.tez.runtime.api.impl.EventMetaData;
@@ -61,6 +67,7 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter {
private final ListeningExecutorService executor;
private volatile ListenableFuture<Void> taskFuture;
private volatile Thread waitingThread;
+ private volatile Thread taskRunner;
private volatile Throwable firstException;
// Effectively a duplicate check, since hadFatalError does the same thing.
@@ -96,7 +103,10 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter {
taskReporter.registerTask(task, this);
TaskRunnerCallable callable = new TaskRunnerCallable();
Throwable failureCause = null;
- taskFuture = executor.submit(callable);
+ if (!Thread.currentThread().isInterrupted()) {
+ taskFuture = executor.submit(callable);
+ return isShutdownRequested();
+ }
try {
taskFuture.get();
@@ -158,6 +168,10 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter {
}
}
}
+ return isShutdownRequested();
+ }
+
+ private boolean isShutdownRequested() {
if (shutdownRequested.get()) {
LOG.info("Shutdown requested... returning");
return false;
@@ -173,11 +187,14 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter {
@Override
public Void run() throws Exception {
try {
+ taskRunner = Thread.currentThread();
LOG.info("Initializing task" + ", taskAttemptId=" + task.getTaskAttemptID());
task.initialize();
if (!Thread.currentThread().isInterrupted() && firstException == null) {
LOG.info("Running task, taskAttemptId=" + task.getTaskAttemptID());
task.run();
+ maybeInterruptWaitingThread();
+
LOG.info("Closing task, taskAttemptId=" + task.getTaskAttemptID());
task.close();
task.setFrameworkCounters();
@@ -199,6 +216,12 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter {
}
return null;
} catch (Throwable cause) {
+ if (Thread.currentThread().isInterrupted()) {
+ LOG.info("TaskRunnerCallable interrupted=" + Thread.currentThread().isInterrupted()
+ + ", shutdownRequest=" + shutdownRequested.get());
+ Thread.currentThread().interrupt();
+ return null;
+ }
if (cause instanceof FSError) {
// Not immediately fatal, this is an error reported by Hadoop FileSystem
maybeRegisterFirstException(cause);
@@ -255,6 +278,17 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter {
taskRunning.set(false);
}
}
+
+ private void maybeInterruptWaitingThread() {
+ /**
+ * Possible that the processor is swallowing InterruptException of taskRunner.interrupt().
+ * In such case, interrupt the waitingThread based on the shutdownRequested flag, so that
+ * entire task gets cancelled.
+ */
+ if (shutdownRequested.get()) {
+ waitingThread.interrupt();
+ }
+ }
}
// should wait until all messages are sent to AM before TezChild shutdown
@@ -353,10 +387,43 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter {
}
}
+ private void abortRunningTask() {
+ if (!taskRunning.get()) {
+ LOG.info("Task is not running");
+ waitingThread.interrupt();
+ return;
+ }
+
+ if (taskRunning.get()) {
+ try {
+ task.abortTask();
+ } catch (Exception e) {
+ LOG.warn("Error when aborting the task", e);
+ try {
+ sendFailure(e, "Error when aborting the task");
+ } catch (Exception ignored) {
+ // Ignored.
+ }
+ }
+ }
+ //Interrupt the relevant threads. TaskRunner should be interrupted preferably.
+ if (isTaskRunning()) {
+ LOG.info("Interrupting taskRunner=" + taskRunner.getName());
+ taskRunner.interrupt();
+ } else {
+ LOG.info("Interrupting waitingThread=" + waitingThread.getName());
+ waitingThread.interrupt();
+ }
+ }
+
+ private boolean isTaskRunning() {
+ return (taskRunning.get() && task.isRunning());
+ }
+
@Override
public void shutdownRequested() {
shutdownRequested.set(true);
- waitingThread.interrupt();
+ abortRunningTask();
}
private String getTaskDiagnosticsString(Throwable t, String message) {