You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/01/16 01:56:14 UTC
tez git commit: TEZ-1962. Fix a thread leak in LocalMode. (sseth)
(cherry picked from commit 8bf99a8df80443ddc36916d21bea2b2c3c1e1013)
Repository: tez
Updated Branches:
refs/heads/branch-0.5 c89b72e0e -> 5f0ca16fe
TEZ-1962. Fix a thread leak in LocalMode. (sseth)
(cherry picked from commit 8bf99a8df80443ddc36916d21bea2b2c3c1e1013)
Conflicts:
CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/5f0ca16f
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/5f0ca16f
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/5f0ca16f
Branch: refs/heads/branch-0.5
Commit: 5f0ca16fe66e4a67f7be6e561ab98994170134c6
Parents: c89b72e
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Jan 15 16:50:49 2015 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Jan 15 16:55:51 2015 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../app/launcher/LocalContainerLauncher.java | 72 +++++++++++++-------
.../org/apache/tez/runtime/task/TezChild.java | 34 +++++----
3 files changed, 71 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/5f0ca16f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e62f807..c453812 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,7 @@ Apache Tez Change Log
Release 0.5.4: Unreleased
ALL CHANGES:
+ TEZ-1962. Fix a thread leak in LocalMode.
TEZ-1878. Task-specific log level override not working in certain conditions.
TEZ-1775. Allow setting log level per logger.
TEZ-1851. FileSystem counters do not differentiate between different FileSystems.
http://git-wip-us.apache.org/repos/asf/tez/blob/5f0ca16f/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
index baeb9a3..75e2c0b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
@@ -181,6 +181,17 @@ public class LocalContainerLauncher extends AbstractService implements
context.getEventHandler().handle(new AMContainerEventLaunchFailed(containerId, message));
}
+ private void handleLaunchFailed(Throwable t, ContainerId containerId) {
+ String message;
+ if (t instanceof RejectedExecutionException) {
+ message = "Failed to queue container launch for container Id: " + containerId;
+ } else {
+ message = "Failed to launch container for container Id: " + containerId;
+ }
+ LOG.error(message, t);
+ sendContainerLaunchFailedMsg(containerId, message);
+ }
+
//launch tasks
private void launch(NMCommunicatorLaunchRequestEvent event) {
@@ -190,19 +201,29 @@ public class LocalContainerLauncher extends AbstractService implements
TezCommonUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS.name()));
try {
+ TezChild tezChild;
+ try {
+ tezChild =
+ createTezChild(context.getAMConf(), event.getContainerId(), tokenIdentifier,
+ context.getApplicationAttemptId().getAttemptId(), localDirs,
+ (TezTaskUmbilicalProtocol) taskAttemptListener);
+ } catch (InterruptedException e) {
+ handleLaunchFailed(e, event.getContainerId());
+ return;
+ } catch (TezException e) {
+ handleLaunchFailed(e, event.getContainerId());
+ return;
+ } catch (IOException e) {
+ handleLaunchFailed(e, event.getContainerId());
+ return;
+ }
ListenableFuture<TezChild.ContainerExecutionResult> runningTaskFuture =
- taskExecutorService.submit(createSubTask(context.getAMConf(),
- event.getContainerId(), tokenIdentifier,
- context.getApplicationAttemptId().getAttemptId(),
- localDirs, (TezTaskUmbilicalProtocol) taskAttemptListener));
+ taskExecutorService.submit(createSubTask(tezChild, event.getContainerId()));
runningContainers.put(event.getContainerId(), runningTaskFuture);
- Futures
- .addCallback(runningTaskFuture, new RunningTaskCallback(context, event.getContainerId()),
- callbackExecutor);
+ Futures.addCallback(runningTaskFuture,
+ new RunningTaskCallback(context, event.getContainerId(), tezChild), callbackExecutor);
} catch (RejectedExecutionException e) {
- String message = "Failed to queue container launch for container Id: " + event.getContainerId();
- LOG.error(message, e);
- sendContainerLaunchFailedMsg(event.getContainerId(), message);
+ handleLaunchFailed(e, event.getContainerId());
}
}
@@ -227,10 +248,12 @@ public class LocalContainerLauncher extends AbstractService implements
private final AppContext appContext;
private final ContainerId containerId;
+ private final TezChild tezChild;
- RunningTaskCallback(AppContext appContext, ContainerId containerId) {
+ RunningTaskCallback(AppContext appContext, ContainerId containerId, TezChild tezChild) {
this.appContext = appContext;
this.containerId = containerId;
+ this.tezChild = tezChild;
}
@Override
@@ -256,6 +279,7 @@ public class LocalContainerLauncher extends AbstractService implements
@Override
public void onFailure(Throwable t) {
runningContainers.remove(containerId);
+ tezChild.shutdown();
// Ignore CancellationException since that is triggered by the LocalContainerLauncher itself
if (!(t instanceof CancellationException)) {
LOG.info("Container: " + containerId + ": Execution Failed: ", t);
@@ -277,12 +301,7 @@ public class LocalContainerLauncher extends AbstractService implements
//create a SubTask
private synchronized Callable<TezChild.ContainerExecutionResult> createSubTask(
- final Configuration defaultConf,
- final ContainerId containerId,
- final String tokenIdentifier,
- final int attemptNumber,
- final String[] localDirs,
- final TezTaskUmbilicalProtocol tezTaskUmbilicalProtocol) {
+ final TezChild tezChild, final ContainerId containerId) {
return new Callable<TezChild.ContainerExecutionResult>() {
@Override
@@ -295,16 +314,23 @@ public class LocalContainerLauncher extends AbstractService implements
context.getApplicationAttemptId());
context.getHistoryHandler().handle(new DAGHistoryEvent(context.getCurrentDAGID(), lEvt));
- // Pull in configuration specified for the session.
- TezChild tezChild =
- TezChild.newTezChild(defaultConf, null, 0, containerId.toString(), tokenIdentifier,
- attemptNumber, localDirs, workingDirectory);
- tezChild.setUmbilical(tezTaskUmbilicalProtocol);
return tezChild.run();
}
};
}
+ private TezChild createTezChild(Configuration defaultConf, ContainerId containerId,
+ String tokenIdentifier, int attemptNumber, String[] localDirs,
+ TezTaskUmbilicalProtocol tezTaskUmbilicalProtocol) throws
+ InterruptedException, TezException, IOException {
+
+ TezChild tezChild =
+ TezChild.newTezChild(defaultConf, null, 0, containerId.toString(), tokenIdentifier,
+ attemptNumber, localDirs, workingDirectory);
+ tezChild.setUmbilical(tezTaskUmbilicalProtocol);
+ return tezChild;
+ }
+
@Override
public void handle(NMCommunicatorEvent event) {
try {
@@ -313,4 +339,4 @@ public class LocalContainerLauncher extends AbstractService implements
throw new TezUncheckedException(e);
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/5f0ca16f/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java
index 05daf5a..3631ca5 100644
--- a/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java
+++ b/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -32,6 +32,7 @@ import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
@@ -95,6 +96,7 @@ public class TezChild {
private final ListeningExecutorService executor;
private final ObjectRegistryImpl objectRegistry;
private final Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
+ private final AtomicBoolean isShutdown = new AtomicBoolean(false);
private Multimap<String, String> startedInputsMap = HashMultimap.create();
@@ -179,20 +181,24 @@ public class TezChild {
TezUtilsInternal.updateLoggers("");
}
ListenableFuture<ContainerTask> getTaskFuture = executor.submit(containerReporter);
+ boolean error = false;
ContainerTask containerTask = null;
try {
containerTask = getTaskFuture.get();
} catch (ExecutionException e) {
+ error = true;
Throwable cause = e.getCause();
- handleError(cause);
return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE,
cause, "Execution Exception while fetching new work: " + e.getMessage());
} catch (InterruptedException e) {
- LOG.info("Interrupted while waiting for new work:"
- + containerTask.getTaskSpec().getTaskAttemptID());
- handleError(e);
+ error = true;
+ LOG.info("Interrupted while waiting for new work");
return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.INTERRUPTED, e,
"Interrupted while waiting for new work");
+ } finally {
+ if (error) {
+ shutdown();
+ }
}
if (containerTask.shouldDie()) {
LOG.info("ContainerTask returned shouldDie=true, Exiting");
@@ -323,15 +329,17 @@ public class TezChild {
lastVertexID = newVertexID;
}
- private void shutdown() {
- executor.shutdownNow();
- if (taskReporter != null) {
- taskReporter.shutdown();
- }
- RPC.stopProxy(umbilical);
- DefaultMetricsSystem.shutdown();
- if (!isLocal) {
- LogManager.shutdown();
+ public void shutdown() {
+ if (!isShutdown.getAndSet(true)) {
+ executor.shutdownNow();
+ if (taskReporter != null) {
+ taskReporter.shutdown();
+ }
+ DefaultMetricsSystem.shutdown();
+ if (!isLocal) {
+ RPC.stopProxy(umbilical);
+ LogManager.shutdown();
+ }
}
}