You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/01/16 01:56:14 UTC

tez git commit: TEZ-1962. Fix a thread leak in LocalMode. (sseth) (cherry picked from commit 8bf99a8df80443ddc36916d21bea2b2c3c1e1013)

Repository: tez
Updated Branches:
  refs/heads/branch-0.5 c89b72e0e -> 5f0ca16fe


TEZ-1962. Fix a thread leak in LocalMode. (sseth)
(cherry picked from commit 8bf99a8df80443ddc36916d21bea2b2c3c1e1013)

Conflicts:
	CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/5f0ca16f
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/5f0ca16f
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/5f0ca16f

Branch: refs/heads/branch-0.5
Commit: 5f0ca16fe66e4a67f7be6e561ab98994170134c6
Parents: c89b72e
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Jan 15 16:50:49 2015 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Jan 15 16:55:51 2015 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../app/launcher/LocalContainerLauncher.java    | 72 +++++++++++++-------
 .../org/apache/tez/runtime/task/TezChild.java   | 34 +++++----
 3 files changed, 71 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/5f0ca16f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e62f807..c453812 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,7 @@ Apache Tez Change Log
 Release 0.5.4: Unreleased
 
 ALL CHANGES:
+  TEZ-1962. Fix a thread leak in LocalMode.
   TEZ-1878. Task-specific log level override not working in certain conditions.
   TEZ-1775. Allow setting log level per logger.
   TEZ-1851. FileSystem counters do not differentiate between different FileSystems.

http://git-wip-us.apache.org/repos/asf/tez/blob/5f0ca16f/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
index baeb9a3..75e2c0b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
@@ -181,6 +181,17 @@ public class LocalContainerLauncher extends AbstractService implements
     context.getEventHandler().handle(new AMContainerEventLaunchFailed(containerId, message));
   }
 
+  private void handleLaunchFailed(Throwable t, ContainerId containerId) {
+    String message;
+    if (t instanceof RejectedExecutionException) {
+      message = "Failed to queue container launch for container Id: " + containerId;
+    } else {
+      message = "Failed to launch container for container Id: " + containerId;
+    }
+    LOG.error(message, t);
+    sendContainerLaunchFailedMsg(containerId, message);
+  }
+
   //launch tasks
   private void launch(NMCommunicatorLaunchRequestEvent event) {
 
@@ -190,19 +201,29 @@ public class LocalContainerLauncher extends AbstractService implements
         TezCommonUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS.name()));
 
     try {
+      TezChild tezChild;
+      try {
+        tezChild =
+            createTezChild(context.getAMConf(), event.getContainerId(), tokenIdentifier,
+                context.getApplicationAttemptId().getAttemptId(), localDirs,
+                (TezTaskUmbilicalProtocol) taskAttemptListener);
+      } catch (InterruptedException e) {
+        handleLaunchFailed(e, event.getContainerId());
+        return;
+      } catch (TezException e) {
+        handleLaunchFailed(e, event.getContainerId());
+        return;
+      } catch (IOException e) {
+        handleLaunchFailed(e, event.getContainerId());
+        return;
+      }
       ListenableFuture<TezChild.ContainerExecutionResult> runningTaskFuture =
-          taskExecutorService.submit(createSubTask(context.getAMConf(),
-              event.getContainerId(), tokenIdentifier,
-              context.getApplicationAttemptId().getAttemptId(),
-              localDirs, (TezTaskUmbilicalProtocol) taskAttemptListener));
+          taskExecutorService.submit(createSubTask(tezChild, event.getContainerId()));
       runningContainers.put(event.getContainerId(), runningTaskFuture);
-      Futures
-          .addCallback(runningTaskFuture, new RunningTaskCallback(context, event.getContainerId()),
-              callbackExecutor);
+      Futures.addCallback(runningTaskFuture,
+          new RunningTaskCallback(context, event.getContainerId(), tezChild), callbackExecutor);
     } catch (RejectedExecutionException e) {
-      String message = "Failed to queue container launch for container Id: " + event.getContainerId();
-      LOG.error(message, e);
-      sendContainerLaunchFailedMsg(event.getContainerId(), message);
+      handleLaunchFailed(e, event.getContainerId());
     }
   }
 
@@ -227,10 +248,12 @@ public class LocalContainerLauncher extends AbstractService implements
 
     private final AppContext appContext;
     private final ContainerId containerId;
+    private final TezChild tezChild;
 
-    RunningTaskCallback(AppContext appContext, ContainerId containerId) {
+    RunningTaskCallback(AppContext appContext, ContainerId containerId, TezChild tezChild) {
       this.appContext = appContext;
       this.containerId = containerId;
+      this.tezChild = tezChild;
     }
 
     @Override
@@ -256,6 +279,7 @@ public class LocalContainerLauncher extends AbstractService implements
     @Override
     public void onFailure(Throwable t) {
       runningContainers.remove(containerId);
+      tezChild.shutdown();
       // Ignore CancellationException since that is triggered by the LocalContainerLauncher itself
       if (!(t instanceof CancellationException)) {
         LOG.info("Container: " + containerId + ": Execution Failed: ", t);
@@ -277,12 +301,7 @@ public class LocalContainerLauncher extends AbstractService implements
 
   //create a SubTask
   private synchronized Callable<TezChild.ContainerExecutionResult> createSubTask(
-      final Configuration defaultConf,
-      final ContainerId containerId,
-      final String tokenIdentifier,
-      final int attemptNumber,
-      final String[] localDirs,
-      final TezTaskUmbilicalProtocol tezTaskUmbilicalProtocol) {
+      final TezChild tezChild, final ContainerId containerId) {
 
     return new Callable<TezChild.ContainerExecutionResult>() {
       @Override
@@ -295,16 +314,23 @@ public class LocalContainerLauncher extends AbstractService implements
                 context.getApplicationAttemptId());
         context.getHistoryHandler().handle(new DAGHistoryEvent(context.getCurrentDAGID(), lEvt));
 
-        // Pull in configuration specified for the session.
-        TezChild tezChild =
-            TezChild.newTezChild(defaultConf, null, 0, containerId.toString(), tokenIdentifier,
-                attemptNumber, localDirs, workingDirectory);
-        tezChild.setUmbilical(tezTaskUmbilicalProtocol);
         return tezChild.run();
       }
     };
   }
 
+  private TezChild createTezChild(Configuration defaultConf, ContainerId containerId,
+                                  String tokenIdentifier, int attemptNumber, String[] localDirs,
+                                  TezTaskUmbilicalProtocol tezTaskUmbilicalProtocol) throws
+      InterruptedException, TezException, IOException {
+
+    TezChild tezChild =
+        TezChild.newTezChild(defaultConf, null, 0, containerId.toString(), tokenIdentifier,
+            attemptNumber, localDirs, workingDirectory);
+    tezChild.setUmbilical(tezTaskUmbilicalProtocol);
+    return tezChild;
+  }
+
   @Override
   public void handle(NMCommunicatorEvent event) {
     try {
@@ -313,4 +339,4 @@ public class LocalContainerLauncher extends AbstractService implements
       throw new TezUncheckedException(e);
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/5f0ca16f/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java
index 05daf5a..3631ca5 100644
--- a/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java
+++ b/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -32,6 +32,7 @@ import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.conf.Configuration;
@@ -95,6 +96,7 @@ public class TezChild {
   private final ListeningExecutorService executor;
   private final ObjectRegistryImpl objectRegistry;
   private final Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
+  private final AtomicBoolean isShutdown = new AtomicBoolean(false);
 
   private Multimap<String, String> startedInputsMap = HashMultimap.create();
 
@@ -179,20 +181,24 @@ public class TezChild {
         TezUtilsInternal.updateLoggers("");
       }
       ListenableFuture<ContainerTask> getTaskFuture = executor.submit(containerReporter);
+      boolean error = false;
       ContainerTask containerTask = null;
       try {
         containerTask = getTaskFuture.get();
       } catch (ExecutionException e) {
+        error = true;
         Throwable cause = e.getCause();
-        handleError(cause);
         return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE,
             cause, "Execution Exception while fetching new work: " + e.getMessage());
       } catch (InterruptedException e) {
-        LOG.info("Interrupted while waiting for new work:"
-            + containerTask.getTaskSpec().getTaskAttemptID());
-        handleError(e);
+        error = true;
+        LOG.info("Interrupted while waiting for new work");
         return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.INTERRUPTED, e,
             "Interrupted while waiting for new work");
+      } finally {
+        if (error) {
+          shutdown();
+        }
       }
       if (containerTask.shouldDie()) {
         LOG.info("ContainerTask returned shouldDie=true, Exiting");
@@ -323,15 +329,17 @@ public class TezChild {
     lastVertexID = newVertexID;
   }
 
-  private void shutdown() {
-    executor.shutdownNow();
-    if (taskReporter != null) {
-      taskReporter.shutdown();
-    }
-    RPC.stopProxy(umbilical);
-    DefaultMetricsSystem.shutdown();
-    if (!isLocal) {
-      LogManager.shutdown();
+  public void shutdown() {
+    if (!isShutdown.getAndSet(true)) {
+      executor.shutdownNow();
+      if (taskReporter != null) {
+        taskReporter.shutdown();
+      }
+      DefaultMetricsSystem.shutdown();
+      if (!isLocal) {
+        RPC.stopProxy(umbilical);
+        LogManager.shutdown();
+      }
     }
   }