You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/06 11:26:16 UTC

[24/51] [abbrv] tez git commit: TEZ-2414. LogicalIOProcessorRuntimeTask, RuntimeTask, TezTaskRunner should handle interrupts & carry out necessary cleanups. Contribtued by Rajesh Balamohan.

TEZ-2414. LogicalIOProcessorRuntimeTask, RuntimeTask, TezTaskRunner should handle interrupts & carry out necessary cleanups. Contribtued by Rajesh Balamohan.


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e4f7ea0a
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e4f7ea0a
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e4f7ea0a

Branch: refs/heads/TEZ-2003
Commit: e4f7ea0aa12f9e5b6352e76745898644c530754d
Parents: 9ed22b2
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed May 6 00:39:46 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Aug 6 01:25:34 2015 -0700

----------------------------------------------------------------------
 TEZ-2003-CHANGES.txt                            |  1 +
 .../runtime/LogicalIOProcessorRuntimeTask.java  | 83 ++++++++++++++++++--
 .../org/apache/tez/runtime/RuntimeTask.java     |  5 ++
 .../apache/tez/runtime/task/TezTaskRunner.java  | 71 ++++++++++++++++-
 4 files changed, 152 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/e4f7ea0a/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index 9fc9ed3..f8a71e8 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -20,5 +20,6 @@ ALL CHANGES:
   TEZ-2361. Propagate dag completion to TaskCommunicator.
   TEZ-2381. Fixes after rebase 04/28.
   TEZ-2388. Send dag identifier as part of the fetcher request string.
+  TEZ-2414. LogicalIOProcessorRuntimeTask, RuntimeTask, TezTaskRunner should handle interrupts & carry out necessary cleanups.
 
 INCOMPATIBLE CHANGES:

http://git-wip-us.apache.org/repos/asf/tez/blob/e4f7ea0a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index 84e5e0d..8263b3f 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -20,6 +20,9 @@ package org.apache.tez.runtime;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
 import java.nio.ByteBuffer;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
@@ -41,6 +44,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import com.google.common.base.Throwables;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.tez.runtime.api.TaskContext;
 import org.apache.tez.runtime.api.impl.TezProcessorContextImpl;
@@ -174,6 +178,9 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     this.runInputMap = new LinkedHashMap<String, LogicalInput>();
     this.runOutputMap = new LinkedHashMap<String, LogicalOutput>();
 
+    this.initializedInputs = new ConcurrentHashMap<String, LogicalInput>();
+    this.initializedOutputs = new ConcurrentHashMap<String, LogicalOutput>();
+
     this.processorDescriptor = taskSpec.getProcessorDescriptor();
     this.serviceConsumerMetadata = serviceConsumerMetadata;
     this.envMap = envMap;
@@ -420,6 +427,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
           taskSpec.getTaskAttemptID());
       initializedInputs.put(edgeName, input);
       LOG.info("Initialized Input with src edge: " + edgeName);
+      initializedInputs.put(edgeName, input);
       return null;
     }
   }
@@ -469,6 +477,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
           outputContext.getDestinationVertexName(), taskSpec.getTaskAttemptID());
       initializedOutputs.put(edgeName, output);
       LOG.info("Initialized Output with dest edge: " + edgeName);
+      initializedOutputs.put(edgeName, output);
       return null;
     }
   }
@@ -694,6 +703,13 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     eventsToBeProcessed.addAll(events);
   }
 
+  @Override
+  public synchronized void abortTask() throws Exception {
+    if (processor != null) {
+      processor.abort();
+    }
+  }
+
   private void startRouterThread() {
     eventRouterThread = new Thread(new RunnableWithNdc() {
       public void runInternal() {
@@ -713,6 +729,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
             if (!isTaskDone()) {
               LOG.warn("Event Router thread interrupted. Returning.");
             }
+            Thread.currentThread().interrupt();
             return;
           }
         }
@@ -724,6 +741,12 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     eventRouterThread.start();
   }
 
+  private void maybeResetInterruptStatus() {
+    if (!Thread.currentThread().isInterrupted()) {
+      Thread.currentThread().interrupt();
+    }
+  }
+
   private void closeContexts() throws IOException {
     closeContext(inputContextMap);
     closeContext(outputContextMap);
@@ -763,6 +786,18 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     }
 
     // Close the unclosed IPO
+    /**
+     * Cleanup IPO that are not closed.  In case, regular close() has happened in IPO, they
+     * would not be available in the IPOs to be cleaned. So this is safe.
+     *
+     * e.g whenever input gets closed() in normal way, it automatically removes it from
+     * initializedInputs map.
+     *
+     * In case any exception happens in processor close or IO close, it wouldn't be removed from
+     * the initialized IO data structures and here is the chance to close them and release
+     * resources.
+     *
+     */
     if (LOG.isDebugEnabled()) {
       LOG.debug("Processor closed={}", processorClosed);
       LOG.debug("Num of inputs to be closed={}", initializedInputs.size());
@@ -773,10 +808,16 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
       try {
         processorClosed = true;
         processor.close();
-        LOG.info("Closed processor for vertex={}, index={}",
+        LOG.info("Closed processor for vertex={}, index={}, interruptedStatus={}",
             processor
                 .getContext().getTaskVertexName(),
-            processor.getContext().getTaskVertexIndex());
+            processor.getContext().getTaskVertexIndex(),
+            Thread.currentThread().isInterrupted());
+        maybeResetInterruptStatus();
+      } catch (InterruptedException ie) {
+        //reset the status
+        LOG.info("Resetting interrupt for processor");
+        Thread.currentThread().interrupt();
       } catch (Throwable e) {
         LOG.warn(
             "Ignoring Exception when closing processor(cleanup). Exception class={}, message={}",
@@ -792,13 +833,19 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
       inputIterator.remove();
       try {
         ((InputFrameworkInterface)entry.getValue()).close();
+        maybeResetInterruptStatus();
+      } catch (InterruptedException ie) {
+        //reset the status
+        LOG.info("Resetting interrupt status for input with srcVertexName={}",
+            srcVertexName);
+        Thread.currentThread().interrupt();
       } catch (Throwable e) {
         LOG.warn(
             "Ignoring exception when closing input {}(cleanup). Exception class={}, message={}",
             srcVertexName, e.getClass().getName(), e.getMessage());
       } finally {
-        LOG.info("Close input for vertex={}, sourceVertex={}", processor
-            .getContext().getTaskVertexName(), srcVertexName);
+        LOG.info("Close input for vertex={}, sourceVertex={}, interruptedStatus={}", processor
+            .getContext().getTaskVertexName(), srcVertexName, Thread.currentThread().isInterrupted());
       }
     }
 
@@ -810,16 +857,26 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
       outputIterator.remove();
       try {
         ((OutputFrameworkInterface) entry.getValue()).close();
+        maybeResetInterruptStatus();
+      } catch (InterruptedException ie) {
+        //reset the status
+        LOG.info("Resetting interrupt status for output with destVertexName={}",
+            destVertexName);
+        Thread.currentThread().interrupt();
       } catch (Throwable e) {
         LOG.warn(
             "Ignoring exception when closing output {}(cleanup). Exception class={}, message={}",
             destVertexName, e.getClass().getName(), e.getMessage());
       } finally {
-        LOG.info("Close input for vertex={}, sourceVertex={}", processor
-            .getContext().getTaskVertexName(), destVertexName);
+        LOG.info("Close input for vertex={}, sourceVertex={}, interruptedStatus={}", processor
+            .getContext().getTaskVertexName(), destVertexName, Thread.currentThread().isInterrupted());
       }
     }
 
+    if (LOG.isDebugEnabled()) {
+      printThreads();
+    }
+
     try {
       closeContexts();
       // Cleanup references which may be held by misbehaved tasks.
@@ -867,6 +924,20 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     inputReadyTracker = null;
     objectRegistry = null;
   }
+
+
+  /**
+   * Print all threads in JVM (only for debugging)
+   */
+  void printThreads() {
+    //Print the status of all threads in JVM
+    ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
+    long[] threadIds = threadMXBean.getAllThreadIds();
+    for (Long id : threadIds) {
+      ThreadInfo threadInfo = threadMXBean.getThreadInfo(id);
+      LOG.info("ThreadId : " + id + ", name=" + threadInfo.getThreadName());
+    }
+  }
   
   @Private
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/tez/blob/e4f7ea0a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
index 17d7053..cdfb46a 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
@@ -76,6 +76,10 @@ public abstract class RuntimeTask {
 
   protected final AtomicReference<State> state = new AtomicReference<State>();
 
+  public boolean isRunning() {
+    return (state.get() == State.RUNNING);
+  }
+
   public TezCounters addAndGetTezCounter(String name) {
     TezCounters counter = new TezCounters();
     counterMap.put(name, counter);
@@ -163,4 +167,5 @@ public abstract class RuntimeTask {
     taskDone.set(true);
   }
 
+  public abstract void abortTask() throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/e4f7ea0a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
index 33a7f4a..7238d5e 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
@@ -25,8 +25,13 @@ import java.security.PrivilegedExceptionAction;
 import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
+import com.google.common.base.Throwables;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSError;
@@ -35,6 +40,7 @@ import org.apache.tez.common.CallableWithNdc;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
+import org.apache.tez.runtime.RuntimeTask;
 import org.apache.tez.runtime.api.ExecutionContext;
 import org.apache.tez.runtime.api.ObjectRegistry;
 import org.apache.tez.runtime.api.impl.EventMetaData;
@@ -61,6 +67,7 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter {
   private final ListeningExecutorService executor;
   private volatile ListenableFuture<Void> taskFuture;
   private volatile Thread waitingThread;
+  private volatile Thread taskRunner;
   private volatile Throwable firstException;
 
   // Effectively a duplicate check, since hadFatalError does the same thing.
@@ -96,7 +103,10 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter {
     taskReporter.registerTask(task, this);
     TaskRunnerCallable callable = new TaskRunnerCallable();
     Throwable failureCause = null;
-    taskFuture = executor.submit(callable);
+    if (!Thread.currentThread().isInterrupted()) {
+      taskFuture = executor.submit(callable);
+      return isShutdownRequested();
+    }
     try {
       taskFuture.get();
 
@@ -158,6 +168,10 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter {
         }
       }
     }
+    return isShutdownRequested();
+  }
+
+  private boolean isShutdownRequested() {
     if (shutdownRequested.get()) {
       LOG.info("Shutdown requested... returning");
       return false;
@@ -173,11 +187,14 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter {
           @Override
           public Void run() throws Exception {
             try {
+              taskRunner = Thread.currentThread();
               LOG.info("Initializing task" + ", taskAttemptId=" + task.getTaskAttemptID());
               task.initialize();
               if (!Thread.currentThread().isInterrupted() && firstException == null) {
                 LOG.info("Running task, taskAttemptId=" + task.getTaskAttemptID());
                 task.run();
+                maybeInterruptWaitingThread();
+
                 LOG.info("Closing task, taskAttemptId=" + task.getTaskAttemptID());
                 task.close();
                 task.setFrameworkCounters();
@@ -199,6 +216,12 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter {
               }
               return null;
             } catch (Throwable cause) {
+              if (Thread.currentThread().isInterrupted()) {
+                LOG.info("TaskRunnerCallable interrupted=" + Thread.currentThread().isInterrupted()
+                    + ", shutdownRequest=" + shutdownRequested.get());
+                Thread.currentThread().interrupt();
+                return null;
+              }
               if (cause instanceof FSError) {
                 // Not immediately fatal, this is an error reported by Hadoop FileSystem
                 maybeRegisterFirstException(cause);
@@ -255,6 +278,17 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter {
         taskRunning.set(false);
       }
     }
+
+    private void maybeInterruptWaitingThread() {
+      /**
+       * Possible that the processor is swallowing InterruptException of taskRunner.interrupt().
+       * In such case, interrupt the waitingThread based on the shutdownRequested flag, so that
+       * entire task gets cancelled.
+       */
+      if (shutdownRequested.get()) {
+        waitingThread.interrupt();
+      }
+    }
   }
 
   // should wait until all messages are sent to AM before TezChild shutdown
@@ -353,10 +387,43 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter {
     }
   }
 
+  private void abortRunningTask() {
+    if (!taskRunning.get()) {
+      LOG.info("Task is not running");
+      waitingThread.interrupt();
+      return;
+    }
+
+    if (taskRunning.get()) {
+      try {
+        task.abortTask();
+      } catch (Exception e) {
+        LOG.warn("Error when aborting the task", e);
+        try {
+          sendFailure(e, "Error when aborting the task");
+        } catch (Exception ignored) {
+          // Ignored.
+        }
+      }
+    }
+    //Interrupt the relevant threads.  TaskRunner should be interrupted preferably.
+    if (isTaskRunning()) {
+      LOG.info("Interrupting taskRunner=" + taskRunner.getName());
+      taskRunner.interrupt();
+    } else {
+      LOG.info("Interrupting waitingThread=" + waitingThread.getName());
+      waitingThread.interrupt();
+    }
+  }
+
+  private boolean isTaskRunning() {
+    return (taskRunning.get() && task.isRunning());
+  }
+
   @Override
   public void shutdownRequested() {
     shutdownRequested.set(true);
-    waitingThread.interrupt();
+    abortRunningTask();
   }
 
   private String getTaskDiagnosticsString(Throwable t, String message) {