You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/06/16 20:10:34 UTC
tez git commit: TEZ-2475. Fix a potential hang in Tez local mode
caused by incorrectly handled interrupts. (sseth)
Repository: tez
Updated Branches:
refs/heads/master 6ae49e99d -> be7e63b28
TEZ-2475. Fix a potential hang in Tez local mode caused by incorrectly
handled interrupts. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/be7e63b2
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/be7e63b2
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/be7e63b2
Branch: refs/heads/master
Commit: be7e63b28b0193dd06d96e73d2a308c5d5b5ef17
Parents: 6ae49e9
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue Jun 16 11:10:28 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Tue Jun 16 11:10:28 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../app/launcher/LocalContainerLauncher.java | 41 +++++++++++++-------
.../tez/runtime/task/ContainerReporter.java | 4 +-
.../org/apache/tez/runtime/task/TezChild.java | 27 ++++++++++---
4 files changed, 51 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/be7e63b2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1d5d095..cca47fd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -248,6 +248,7 @@ Release 0.6.2: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2475. Fix a potential hang in Tez local mode caused by incorrectly handled interrupts.
TEZ-2548. TezClient submitDAG can hang if the AM is in the process of shutting down.
TEZ-2534. Error handling summary event when shutting down AM.
TEZ-2511. Add exitCode to diagnostics when container fails.
http://git-wip-us.apache.org/repos/asf/tez/blob/be7e63b2/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
index 9faf8c0..a5cab86 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
@@ -93,9 +93,9 @@ public class LocalContainerLauncher extends AbstractService implements
private final ExecutionContext executionContext;
private int numExecutors;
- private final ConcurrentHashMap<ContainerId, ListenableFuture<TezChild.ContainerExecutionResult>>
+ private final ConcurrentHashMap<ContainerId, RunningTaskCallback>
runningContainers =
- new ConcurrentHashMap<ContainerId, ListenableFuture<TezChild.ContainerExecutionResult>>();
+ new ConcurrentHashMap<ContainerId, RunningTaskCallback>();
private final ExecutorService callbackExecutor = Executors.newFixedThreadPool(1,
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("CallbackExecutor").build());
@@ -144,6 +144,7 @@ public class LocalContainerLauncher extends AbstractService implements
public void serviceStop() throws Exception {
if (!serviceStopped.compareAndSet(false, true)) {
LOG.info("Service Already stopped. Ignoring additional stop");
+ return;
}
if (eventHandlingThread != null) {
eventHandlingThread.interrupt();
@@ -184,6 +185,9 @@ public class LocalContainerLauncher extends AbstractService implements
LOG.error("TezSubTaskRunner interrupted ", e);
}
return;
+ } catch (Throwable e) {
+ LOG.error("TezSubTaskRunner failed due to exception", e);
+ throw e;
}
}
}
@@ -229,24 +233,29 @@ public class LocalContainerLauncher extends AbstractService implements
}
ListenableFuture<TezChild.ContainerExecutionResult> runningTaskFuture =
taskExecutorService.submit(createSubTask(tezChild, event.getContainerId()));
- runningContainers.put(event.getContainerId(), runningTaskFuture);
- Futures.addCallback(runningTaskFuture,
- new RunningTaskCallback(context, event.getContainerId(), tezChild), callbackExecutor);
+ RunningTaskCallback callback = new RunningTaskCallback(context, event.getContainerId());
+ runningContainers.put(event.getContainerId(), callback);
+ Futures.addCallback(runningTaskFuture, callback, callbackExecutor);
} catch (RejectedExecutionException e) {
handleLaunchFailed(e, event.getContainerId());
}
}
private void stop(NMCommunicatorStopRequestEvent event) {
- ListenableFuture<TezChild.ContainerExecutionResult> future =
+ // A stop_request will come in when a task completes and reports back or a preemption decision
+ // is made. Currently the LocalTaskScheduler does not support preemption. Also preemption
+ // will not work in local mode till Tez supports task preemption instead of container preemption.
+ RunningTaskCallback callback =
runningContainers.get(event.getContainerId());
- if (future == null) {
+ if (callback == null) {
LOG.info("Ignoring stop request for containerId: " + event.getContainerId());
} else {
- LOG.info("Interrupting running/queued container with id: " + event.getContainerId());
- future.cancel(true);
- // This will work only if the running task respects Interrupts - which at the moment is
- // not the case for parts of the Runtime.
+ LOG.info(
+ "Ignoring stop request for containerId {}. Relying on regular task shutdown for it to end",
+ event.getContainerId());
+ // Allow the tezChild thread to run it's course. It'll receive a shutdown request from the
+ // AM eventually since the task and container will be unregistered.
+ // This will need to be fixed once interrupting tasks is supported.
}
// Send this event to maintain regular control flow. This isn't of much use though.
context.getEventHandler().handle(
@@ -258,17 +267,16 @@ public class LocalContainerLauncher extends AbstractService implements
private final AppContext appContext;
private final ContainerId containerId;
- private final TezChild tezChild;
- RunningTaskCallback(AppContext appContext, ContainerId containerId, TezChild tezChild) {
+ RunningTaskCallback(AppContext appContext, ContainerId containerId) {
this.appContext = appContext;
this.containerId = containerId;
- this.tezChild = tezChild;
}
@Override
public void onSuccess(TezChild.ContainerExecutionResult result) {
runningContainers.remove(containerId);
+ LOG.info("ContainerExecutionResult for: " + containerId + " = " + result);
if (result.getExitStatus() == TezChild.ContainerExecutionResult.ExitStatus.SUCCESS ||
result.getExitStatus() ==
TezChild.ContainerExecutionResult.ExitStatus.ASKED_TO_DIE) {
@@ -289,8 +297,8 @@ public class LocalContainerLauncher extends AbstractService implements
@Override
public void onFailure(Throwable t) {
runningContainers.remove(containerId);
- tezChild.shutdown();
// Ignore CancellationException since that is triggered by the LocalContainerLauncher itself
+ // TezChild would have exited by this time. There's no need to invoke shutdown again.
if (!(t instanceof CancellationException)) {
LOG.info("Container: " + containerId + ": Execution Failed: ", t);
// Inform of failure with exit code 1.
@@ -317,6 +325,9 @@ public class LocalContainerLauncher extends AbstractService implements
@Override
public TezChild.ContainerExecutionResult call() throws InterruptedException, TezException,
IOException {
+ // Reset the interrupt status. Ideally the thread should not be in an interrupted state.
+ // TezTaskRunner needs to be fixed to ensure this.
+ Thread.interrupted();
// Inform about the launch request now that the container has been allocated a thread to execute in.
context.getEventHandler().handle(new AMContainerEventLaunched(containerId));
ContainerLaunchedEvent lEvt =
http://git-wip-us.apache.org/repos/asf/tez/blob/be7e63b2/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java
index 06b90d8..8ee30c5 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java
@@ -54,7 +54,7 @@ public class ContainerReporter extends CallableWithNdc<ContainerTask> {
@Override
protected ContainerTask callInternal() throws Exception {
ContainerTask containerTask = null;
- LOG.info("Attempting to fetch new task");
+ LOG.info("Attempting to fetch new task for container {}", containerContext.getContainerIdentifier());
containerTask = umbilical.getTask(containerContext);
long getTaskPollStartTime = System.currentTimeMillis();
nextGetTaskPrintTime = getTaskPollStartTime + LOG_INTERVAL;
@@ -64,7 +64,7 @@ public class ContainerReporter extends CallableWithNdc<ContainerTask> {
TimeUnit.MILLISECONDS.sleep(sleepTimeMilliSecs);
containerTask = umbilical.getTask(containerContext);
}
- LOG.info("Got TaskUpdate: "
+ LOG.info("Got TaskUpdate for containerId= " + containerContext.getContainerIdentifier() + ": "
+ (System.currentTimeMillis() - getTaskPollStartTime)
+ " ms after starting to poll."
+ " TaskInfo: shouldDie: "
http://git-wip-us.apache.org/repos/asf/tez/blob/be7e63b2/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
index 36297a9..062b497 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -32,6 +32,7 @@ import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -196,7 +197,7 @@ public class TezChild {
UserGroupInformation childUGI = null;
- while (!executor.isTerminated()) {
+ while (!executor.isTerminated() && !isShutdown.get()) {
if (taskCount > 0) {
TezUtilsInternal.updateLoggers("");
}
@@ -212,7 +213,7 @@ public class TezChild {
cause, "Execution Exception while fetching new work: " + e.getMessage());
} catch (InterruptedException e) {
error = true;
- LOG.info("Interrupted while waiting for new work");
+ LOG.info("Interrupted while waiting for new work for container {}", containerIdString);
return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.INTERRUPTED, e,
"Interrupted while waiting for new work");
} finally {
@@ -221,7 +222,7 @@ public class TezChild {
}
}
if (containerTask.shouldDie()) {
- LOG.info("ContainerTask returned shouldDie=true, Exiting");
+ LOG.info("ContainerTask returned shouldDie=true for container {}, Exiting", containerIdString);
shutdown();
return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.SUCCESS, null,
"Asked to die by the AM");
@@ -249,7 +250,7 @@ public class TezChild {
try {
shouldDie = !taskRunner.run();
if (shouldDie) {
- LOG.info("Got a shouldDie notification via hearbeats. Shutting down");
+ LOG.info("Got a shouldDie notification via heartbeats for container {}. Shutting down", containerIdString);
shutdown();
return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.SUCCESS, null,
"Asked to die by the AM");
@@ -355,8 +356,15 @@ public class TezChild {
}
public void shutdown() {
+ LOG.info("Shutdown invoked for container {}", containerIdString);
if (!isShutdown.getAndSet(true)) {
- executor.shutdownNow();
+ LOG.info("Shutting down container {}", containerIdString);
+ // It's possible that there's pending tasks on the executor. Those should be cancelled.
+ List<Runnable> pendingRunnables = executor.shutdownNow();
+ for (Runnable r : pendingRunnables) {
+ LOG.info("Cancelling pending runnables during TezChild shutdown for containerId={}", containerIdString);
+ ((FutureTask)r).cancel(false);
+ }
if (taskReporter != null) {
taskReporter.shutdown();
}
@@ -413,6 +421,15 @@ public class TezChild {
public String getErrorMessage() {
return this.errorMessage;
}
+
+ @Override
+ public String toString() {
+ return "ContainerExecutionResult{" +
+ "exitStatus=" + exitStatus +
+ ", throwable=" + throwable +
+ ", errorMessage='" + errorMessage + '\'' +
+ '}';
+ }
}
public static TezChild newTezChild(Configuration conf, String host, int port, String containerIdentifier,