You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/06/16 20:10:34 UTC

tez git commit: TEZ-2475. Fix a potential hang in Tez local mode caused by incorrectly handled interrupts. (sseth)

Repository: tez
Updated Branches:
  refs/heads/master 6ae49e99d -> be7e63b28


TEZ-2475. Fix a potential hang in Tez local mode caused by incorrectly
handled interrupts. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/be7e63b2
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/be7e63b2
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/be7e63b2

Branch: refs/heads/master
Commit: be7e63b28b0193dd06d96e73d2a308c5d5b5ef17
Parents: 6ae49e9
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue Jun 16 11:10:28 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Tue Jun 16 11:10:28 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../app/launcher/LocalContainerLauncher.java    | 41 +++++++++++++-------
 .../tez/runtime/task/ContainerReporter.java     |  4 +-
 .../org/apache/tez/runtime/task/TezChild.java   | 27 ++++++++++---
 4 files changed, 51 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/be7e63b2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1d5d095..cca47fd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -248,6 +248,7 @@ Release 0.6.2: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2475. Fix a potential hang in Tez local mode caused by incorrectly handled interrupts.
   TEZ-2548. TezClient submitDAG can hang if the AM is in the process of shutting down.
   TEZ-2534. Error handling summary event when shutting down AM.
   TEZ-2511. Add exitCode to diagnostics when container fails.

http://git-wip-us.apache.org/repos/asf/tez/blob/be7e63b2/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
index 9faf8c0..a5cab86 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
@@ -93,9 +93,9 @@ public class LocalContainerLauncher extends AbstractService implements
   private final ExecutionContext executionContext;
   private int numExecutors;
 
-  private final ConcurrentHashMap<ContainerId, ListenableFuture<TezChild.ContainerExecutionResult>>
+  private final ConcurrentHashMap<ContainerId, RunningTaskCallback>
       runningContainers =
-      new ConcurrentHashMap<ContainerId, ListenableFuture<TezChild.ContainerExecutionResult>>();
+      new ConcurrentHashMap<ContainerId, RunningTaskCallback>();
 
   private final ExecutorService callbackExecutor = Executors.newFixedThreadPool(1,
       new ThreadFactoryBuilder().setDaemon(true).setNameFormat("CallbackExecutor").build());
@@ -144,6 +144,7 @@ public class LocalContainerLauncher extends AbstractService implements
   public void serviceStop() throws Exception {
     if (!serviceStopped.compareAndSet(false, true)) {
       LOG.info("Service Already stopped. Ignoring additional stop");
+      return;
     }
     if (eventHandlingThread != null) {
       eventHandlingThread.interrupt();
@@ -184,6 +185,9 @@ public class LocalContainerLauncher extends AbstractService implements
             LOG.error("TezSubTaskRunner interrupted ", e);
           }
           return;
+        } catch (Throwable e) {
+          LOG.error("TezSubTaskRunner failed due to exception", e);
+          throw e;
         }
       }
     }
@@ -229,24 +233,29 @@ public class LocalContainerLauncher extends AbstractService implements
       }
       ListenableFuture<TezChild.ContainerExecutionResult> runningTaskFuture =
           taskExecutorService.submit(createSubTask(tezChild, event.getContainerId()));
-      runningContainers.put(event.getContainerId(), runningTaskFuture);
-      Futures.addCallback(runningTaskFuture,
-          new RunningTaskCallback(context, event.getContainerId(), tezChild), callbackExecutor);
+      RunningTaskCallback callback = new RunningTaskCallback(context, event.getContainerId());
+      runningContainers.put(event.getContainerId(), callback);
+      Futures.addCallback(runningTaskFuture, callback, callbackExecutor);
     } catch (RejectedExecutionException e) {
       handleLaunchFailed(e, event.getContainerId());
     }
   }
 
   private void stop(NMCommunicatorStopRequestEvent event) {
-    ListenableFuture<TezChild.ContainerExecutionResult> future =
+    // A stop_request will come in when a task completes and reports back or a preemption decision
+    // is made. Currently the LocalTaskScheduler does not support preemption. Also preemption
+    // will not work in local mode till Tez supports task preemption instead of container preemption.
+    RunningTaskCallback callback =
         runningContainers.get(event.getContainerId());
-    if (future == null) {
+    if (callback == null) {
       LOG.info("Ignoring stop request for containerId: " + event.getContainerId());
     } else {
-      LOG.info("Interrupting running/queued container with id: " + event.getContainerId());
-      future.cancel(true);
-      // This will work only if the running task respects Interrupts - which at the moment is
-      // not the case for parts of the Runtime.
+      LOG.info(
+          "Ignoring stop request for containerId {}. Relying on regular task shutdown for it to end",
+          event.getContainerId());
+      // Allow the tezChild thread to run it's course. It'll receive a shutdown request from the
+      // AM eventually since the task and container will be unregistered.
+      // This will need to be fixed once interrupting tasks is supported.
     }
     // Send this event to maintain regular control flow. This isn't of much use though.
     context.getEventHandler().handle(
@@ -258,17 +267,16 @@ public class LocalContainerLauncher extends AbstractService implements
 
     private final AppContext appContext;
     private final ContainerId containerId;
-    private final TezChild tezChild;
 
-    RunningTaskCallback(AppContext appContext, ContainerId containerId, TezChild tezChild) {
+    RunningTaskCallback(AppContext appContext, ContainerId containerId) {
       this.appContext = appContext;
       this.containerId = containerId;
-      this.tezChild = tezChild;
     }
 
     @Override
     public void onSuccess(TezChild.ContainerExecutionResult result) {
       runningContainers.remove(containerId);
+      LOG.info("ContainerExecutionResult for: " + containerId + " = " + result);
       if (result.getExitStatus() == TezChild.ContainerExecutionResult.ExitStatus.SUCCESS ||
           result.getExitStatus() ==
               TezChild.ContainerExecutionResult.ExitStatus.ASKED_TO_DIE) {
@@ -289,8 +297,8 @@ public class LocalContainerLauncher extends AbstractService implements
     @Override
     public void onFailure(Throwable t) {
       runningContainers.remove(containerId);
-      tezChild.shutdown();
       // Ignore CancellationException since that is triggered by the LocalContainerLauncher itself
+      // TezChild would have exited by this time. There's no need to invoke shutdown again.
       if (!(t instanceof CancellationException)) {
         LOG.info("Container: " + containerId + ": Execution Failed: ", t);
         // Inform of failure with exit code 1.
@@ -317,6 +325,9 @@ public class LocalContainerLauncher extends AbstractService implements
       @Override
       public TezChild.ContainerExecutionResult call() throws InterruptedException, TezException,
           IOException {
+        // Reset the interrupt status. Ideally the thread should not be in an interrupted state.
+        // TezTaskRunner needs to be fixed to ensure this.
+        Thread.interrupted();
         // Inform about the launch request now that the container has been allocated a thread to execute in.
         context.getEventHandler().handle(new AMContainerEventLaunched(containerId));
         ContainerLaunchedEvent lEvt =

http://git-wip-us.apache.org/repos/asf/tez/blob/be7e63b2/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java
index 06b90d8..8ee30c5 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java
@@ -54,7 +54,7 @@ public class ContainerReporter extends CallableWithNdc<ContainerTask> {
   @Override
   protected ContainerTask callInternal() throws Exception {
     ContainerTask containerTask = null;
-    LOG.info("Attempting to fetch new task");
+    LOG.info("Attempting to fetch new task for container {}", containerContext.getContainerIdentifier());
     containerTask = umbilical.getTask(containerContext);
     long getTaskPollStartTime = System.currentTimeMillis();
     nextGetTaskPrintTime = getTaskPollStartTime + LOG_INTERVAL;
@@ -64,7 +64,7 @@ public class ContainerReporter extends CallableWithNdc<ContainerTask> {
       TimeUnit.MILLISECONDS.sleep(sleepTimeMilliSecs);
       containerTask = umbilical.getTask(containerContext);
     }
-    LOG.info("Got TaskUpdate: "
+    LOG.info("Got TaskUpdate for containerId= " + containerContext.getContainerIdentifier() + ": "
         + (System.currentTimeMillis() - getTaskPollStartTime)
         + " ms after starting to poll."
         + " TaskInfo: shouldDie: "

http://git-wip-us.apache.org/repos/asf/tez/blob/be7e63b2/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
index 36297a9..062b497 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -32,6 +32,7 @@ import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -196,7 +197,7 @@ public class TezChild {
 
     UserGroupInformation childUGI = null;
 
-    while (!executor.isTerminated()) {
+    while (!executor.isTerminated() && !isShutdown.get()) {
       if (taskCount > 0) {
         TezUtilsInternal.updateLoggers("");
       }
@@ -212,7 +213,7 @@ public class TezChild {
             cause, "Execution Exception while fetching new work: " + e.getMessage());
       } catch (InterruptedException e) {
         error = true;
-        LOG.info("Interrupted while waiting for new work");
+        LOG.info("Interrupted while waiting for new work for container {}", containerIdString);
         return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.INTERRUPTED, e,
             "Interrupted while waiting for new work");
       } finally {
@@ -221,7 +222,7 @@ public class TezChild {
         }
       }
       if (containerTask.shouldDie()) {
-        LOG.info("ContainerTask returned shouldDie=true, Exiting");
+        LOG.info("ContainerTask returned shouldDie=true for container {}, Exiting", containerIdString);
         shutdown();
         return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.SUCCESS, null,
             "Asked to die by the AM");
@@ -249,7 +250,7 @@ public class TezChild {
         try {
           shouldDie = !taskRunner.run();
           if (shouldDie) {
-            LOG.info("Got a shouldDie notification via hearbeats. Shutting down");
+            LOG.info("Got a shouldDie notification via heartbeats for container {}. Shutting down", containerIdString);
             shutdown();
             return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.SUCCESS, null,
                 "Asked to die by the AM");
@@ -355,8 +356,15 @@ public class TezChild {
   }
 
   public void shutdown() {
+    LOG.info("Shutdown invoked for container {}", containerIdString);
     if (!isShutdown.getAndSet(true)) {
-      executor.shutdownNow();
+      LOG.info("Shutting down container {}", containerIdString);
+      // It's possible that there's pending tasks on the executor. Those should be cancelled.
+      List<Runnable> pendingRunnables = executor.shutdownNow();
+      for (Runnable r : pendingRunnables) {
+        LOG.info("Cancelling pending runnables during TezChild shutdown for containerId={}", containerIdString);
+        ((FutureTask)r).cancel(false);
+      }
       if (taskReporter != null) {
         taskReporter.shutdown();
       }
@@ -413,6 +421,15 @@ public class TezChild {
     public String getErrorMessage() {
       return this.errorMessage;
     }
+
+    @Override
+    public String toString() {
+      return "ContainerExecutionResult{" +
+          "exitStatus=" + exitStatus +
+          ", throwable=" + throwable +
+          ", errorMessage='" + errorMessage + '\'' +
+          '}';
+    }
   }
 
   public static TezChild newTezChild(Configuration conf, String host, int port, String containerIdentifier,