You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/01/16 01:04:37 UTC
tez git commit: TEZ-1962. Fix a thread leak in LocalMode. (sseth)
Repository: tez
Updated Branches:
refs/heads/master 2762d9b5c -> 2544b05b7
TEZ-1962. Fix a thread leak in LocalMode. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/2544b05b
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/2544b05b
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/2544b05b
Branch: refs/heads/master
Commit: 2544b05b7d34185c3065eb3e51596ef44ac3f5bb
Parents: 2762d9b
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Jan 15 16:04:12 2015 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Jan 15 16:04:12 2015 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../app/launcher/LocalContainerLauncher.java | 79 +++++++++++++-------
.../org/apache/tez/runtime/task/TezChild.java | 34 +++++----
3 files changed, 75 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/2544b05b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index dfa12cd..4848835 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -124,6 +124,7 @@ TEZ-UI CHANGES (TEZ-8):
Release 0.5.4: Unreleased
ALL CHANGES:
+ TEZ-1962. Fix a thread leak in LocalMode.
TEZ-1924. Tez AM does not register with AM with full FQDN causing jobs
to fail in some environments.
TEZ-1878. Task-specific log level override not working in certain conditions.
http://git-wip-us.apache.org/repos/asf/tez/blob/2544b05b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
index ea0f9a5..0343828 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
@@ -182,24 +182,45 @@ public class LocalContainerLauncher extends AbstractService implements
context.getEventHandler().handle(new AMContainerEventLaunchFailed(containerId, message));
}
+ private void handleLaunchFailed(Throwable t, ContainerId containerId) {
+ String message;
+ if (t instanceof RejectedExecutionException) {
+ message = "Failed to queue container launch for container Id: " + containerId;
+ } else {
+ message = "Failed to launch container for container Id: " + containerId;
+ }
+ LOG.error(message, t);
+ sendContainerLaunchFailedMsg(containerId, message);
+ }
+
//launch tasks
private void launch(NMCommunicatorLaunchRequestEvent event) {
String tokenIdentifier = context.getApplicationID().toString();
try {
+ TezChild tezChild;
+ try {
+ tezChild =
+ createTezChild(context.getAMConf(), event.getContainerId(), tokenIdentifier,
+ context.getApplicationAttemptId().getAttemptId(), context.getLocalDirs(),
+ (TezTaskUmbilicalProtocol) taskAttemptListener);
+ } catch (InterruptedException e) {
+ handleLaunchFailed(e, event.getContainerId());
+ return;
+ } catch (TezException e) {
+ handleLaunchFailed(e, event.getContainerId());
+ return;
+ } catch (IOException e) {
+ handleLaunchFailed(e, event.getContainerId());
+ return;
+ }
ListenableFuture<TezChild.ContainerExecutionResult> runningTaskFuture =
- taskExecutorService.submit(createSubTask(context.getAMConf(),
- event.getContainerId(), tokenIdentifier,
- context.getApplicationAttemptId().getAttemptId(),
- context.getLocalDirs(), (TezTaskUmbilicalProtocol) taskAttemptListener));
+ taskExecutorService.submit(createSubTask(tezChild, event.getContainerId()));
runningContainers.put(event.getContainerId(), runningTaskFuture);
- Futures
- .addCallback(runningTaskFuture, new RunningTaskCallback(context, event.getContainerId()),
- callbackExecutor);
+ Futures.addCallback(runningTaskFuture,
+ new RunningTaskCallback(context, event.getContainerId(), tezChild), callbackExecutor);
} catch (RejectedExecutionException e) {
- String message = "Failed to queue container launch for container Id: " + event.getContainerId();
- LOG.error(message, e);
- sendContainerLaunchFailedMsg(event.getContainerId(), message);
+ handleLaunchFailed(e, event.getContainerId());
}
}
@@ -224,10 +245,12 @@ public class LocalContainerLauncher extends AbstractService implements
private final AppContext appContext;
private final ContainerId containerId;
+ private final TezChild tezChild;
- RunningTaskCallback(AppContext appContext, ContainerId containerId) {
+ RunningTaskCallback(AppContext appContext, ContainerId containerId, TezChild tezChild) {
this.appContext = appContext;
this.containerId = containerId;
+ this.tezChild = tezChild;
}
@Override
@@ -253,6 +276,7 @@ public class LocalContainerLauncher extends AbstractService implements
@Override
public void onFailure(Throwable t) {
runningContainers.remove(containerId);
+ tezChild.shutdown();
// Ignore CancellationException since that is triggered by the LocalContainerLauncher itself
if (!(t instanceof CancellationException)) {
LOG.info("Container: " + containerId + ": Execution Failed: ", t);
@@ -274,12 +298,7 @@ public class LocalContainerLauncher extends AbstractService implements
//create a SubTask
private synchronized Callable<TezChild.ContainerExecutionResult> createSubTask(
- final Configuration defaultConf,
- final ContainerId containerId,
- final String tokenIdentifier,
- final int attemptNumber,
- final String[] localDirs,
- final TezTaskUmbilicalProtocol tezTaskUmbilicalProtocol) {
+ final TezChild tezChild, final ContainerId containerId) {
return new Callable<TezChild.ContainerExecutionResult>() {
@Override
@@ -292,20 +311,28 @@ public class LocalContainerLauncher extends AbstractService implements
context.getApplicationAttemptId());
context.getHistoryHandler().handle(new DAGHistoryEvent(context.getCurrentDAGID(), lEvt));
-
- Map<String, String> containerEnv = new HashMap<String, String>();
- containerEnv.putAll(localEnv);
- containerEnv.put(Environment.USER.name(), context.getUser());
- // Pull in configuration specified for the session.
- TezChild tezChild =
- TezChild.newTezChild(defaultConf, null, 0, containerId.toString(), tokenIdentifier,
- attemptNumber, localDirs, workingDirectory, containerEnv, "", ExecutionContext);
- tezChild.setUmbilical(tezTaskUmbilicalProtocol);
return tezChild.run();
}
};
}
+ private TezChild createTezChild(Configuration defaultConf, ContainerId containerId,
+ String tokenIdentifier, int attemptNumber, String[] localDirs,
+ TezTaskUmbilicalProtocol tezTaskUmbilicalProtocol) throws
+ InterruptedException, TezException, IOException {
+ Map<String, String> containerEnv = new HashMap<String, String>();
+ containerEnv.putAll(localEnv);
+ containerEnv.put(Environment.USER.name(), context.getUser());
+
+ TezChild tezChild =
+ TezChild.newTezChild(defaultConf, null, 0, containerId.toString(), tokenIdentifier,
+ attemptNumber, localDirs, workingDirectory, containerEnv, "", ExecutionContext);
+ tezChild.setUmbilical(tezTaskUmbilicalProtocol);
+ return tezChild;
+ }
+
+
+
@Override
public void handle(NMCommunicatorEvent event) {
try {
http://git-wip-us.apache.org/repos/asf/tez/blob/2544b05b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
index c0843a0..a71fc55 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -32,6 +32,7 @@ import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
@@ -101,6 +102,7 @@ public class TezChild {
private final ExecutionContext ExecutionContext;
private final Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
private final Map<String, String> serviceProviderEnvMap;
+ private final AtomicBoolean isShutdown = new AtomicBoolean(false);
private Multimap<String, String> startedInputsMap = HashMultimap.create();
@@ -191,20 +193,24 @@ public class TezChild {
TezUtilsInternal.updateLoggers("");
}
ListenableFuture<ContainerTask> getTaskFuture = executor.submit(containerReporter);
+ boolean error = false;
ContainerTask containerTask = null;
try {
containerTask = getTaskFuture.get();
} catch (ExecutionException e) {
+ error = true;
Throwable cause = e.getCause();
- handleError(cause);
return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE,
cause, "Execution Exception while fetching new work: " + e.getMessage());
} catch (InterruptedException e) {
- LOG.info("Interrupted while waiting for new work:"
- + containerTask.getTaskSpec().getTaskAttemptID());
- handleError(e);
+ error = true;
+ LOG.info("Interrupted while waiting for new work");
return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.INTERRUPTED, e,
"Interrupted while waiting for new work");
+ } finally {
+ if (error) {
+ shutdown();
+ }
}
if (containerTask.shouldDie()) {
LOG.info("ContainerTask returned shouldDie=true, Exiting");
@@ -336,15 +342,17 @@ public class TezChild {
lastVertexID = newVertexID;
}
- private void shutdown() {
- executor.shutdownNow();
- if (taskReporter != null) {
- taskReporter.shutdown();
- }
- RPC.stopProxy(umbilical);
- DefaultMetricsSystem.shutdown();
- if (!isLocal) {
- LogManager.shutdown();
+ public void shutdown() {
+ if (!isShutdown.getAndSet(true)) {
+ executor.shutdownNow();
+ if (taskReporter != null) {
+ taskReporter.shutdown();
+ }
+ DefaultMetricsSystem.shutdown();
+ if (!isLocal) {
+ RPC.stopProxy(umbilical);
+ LogManager.shutdown();
+ }
}
}