You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/06 09:41:52 UTC

[50/50] [abbrv] tez git commit: TEZ-2414. LogicalIOProcessorRuntimeTask, RuntimeTask, TezTaskRunner should handle interrupts & carry out necessary cleanups. Contribtued by Rajesh Balamohan.

TEZ-2414. LogicalIOProcessorRuntimeTask, RuntimeTask, TezTaskRunner should handle interrupts & carry out necessary cleanups. Contribtued by Rajesh Balamohan.


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/9338abfe
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/9338abfe
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/9338abfe

Branch: refs/heads/TEZ-2003
Commit: 9338abfe9a12271e4247a7cf45f3f8fff30e6bb7
Parents: e879128
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed May 6 00:39:46 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed May 6 00:39:46 2015 -0700

----------------------------------------------------------------------
 TEZ-2003-CHANGES.txt                            |   1 +
 .../runtime/LogicalIOProcessorRuntimeTask.java  | 142 +++++++++++++++++++
 .../org/apache/tez/runtime/RuntimeTask.java     |   5 +
 .../apache/tez/runtime/task/TezTaskRunner.java  |  71 +++++++++-
 4 files changed, 217 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/9338abfe/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index 9fc9ed3..f8a71e8 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -20,5 +20,6 @@ ALL CHANGES:
   TEZ-2361. Propagate dag completion to TaskCommunicator.
   TEZ-2381. Fixes after rebase 04/28.
   TEZ-2388. Send dag identifier as part of the fetcher request string.
+  TEZ-2414. LogicalIOProcessorRuntimeTask, RuntimeTask, TezTaskRunner should handle interrupts & carry out necessary cleanups.
 
 INCOMPATIBLE CHANGES:

http://git-wip-us.apache.org/repos/asf/tez/blob/9338abfe/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index f465d3c..7040598 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -20,10 +20,14 @@ package org.apache.tez.runtime;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -37,6 +41,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import com.google.common.base.Throwables;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.tez.runtime.api.TaskContext;
 import org.apache.tez.runtime.api.impl.TezProcessorContextImpl;
@@ -109,6 +114,10 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   private final List<GroupInputSpec> groupInputSpecs;
   private ConcurrentHashMap<String, MergedLogicalInput> groupInputsMap;
 
+  private final ConcurrentHashMap<String, LogicalInput> initializedInputs;
+  private final ConcurrentHashMap<String, LogicalOutput> initializedOutputs;
+  private boolean processorClosed;
+
   private final ProcessorDescriptor processorDescriptor;
   private AbstractLogicalIOProcessor processor;
   private ProcessorContext processorContext;
@@ -160,6 +169,9 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     this.runInputMap = new LinkedHashMap<String, LogicalInput>();
     this.runOutputMap = new LinkedHashMap<String, LogicalOutput>();
 
+    this.initializedInputs = new ConcurrentHashMap<String, LogicalInput>();
+    this.initializedOutputs = new ConcurrentHashMap<String, LogicalOutput>();
+
     this.processorDescriptor = taskSpec.getProcessorDescriptor();
     this.serviceConsumerMetadata = serviceConsumerMetadata;
     this.envMap = envMap;
@@ -338,11 +350,13 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
       this.state.set(State.CLOSED);
 
       // Close the Processor.
+      processorClosed = true;
       processor.close();
 
       // Close the Inputs.
       for (InputSpec inputSpec : inputSpecs) {
         String srcVertexName = inputSpec.getSourceVertexName();
+        initializedInputs.remove(srcVertexName);
         List<Event> closeInputEvents = ((InputFrameworkInterface)inputsMap.get(srcVertexName)).close();
         sendTaskGeneratedEvents(closeInputEvents,
             EventProducerConsumerType.INPUT, taskSpec.getVertexName(),
@@ -352,6 +366,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
       // Close the Outputs.
       for (OutputSpec outputSpec : outputSpecs) {
         String destVertexName = outputSpec.getDestinationVertexName();
+        initializedOutputs.remove(destVertexName);
         List<Event> closeOutputEvents = ((LogicalOutputFrameworkInterface)outputsMap.get(destVertexName)).close();
         sendTaskGeneratedEvents(closeOutputEvents,
             EventProducerConsumerType.OUTPUT, taskSpec.getVertexName(),
@@ -391,6 +406,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
           inputContext.getTaskVertexName(), inputContext.getSourceVertexName(),
           taskSpec.getTaskAttemptID());
       LOG.info("Initialized Input with src edge: " + edgeName);
+      initializedInputs.put(edgeName, input);
       return null;
     }
   }
@@ -439,6 +455,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
           outputContext.getTaskVertexName(),
           outputContext.getDestinationVertexName(), taskSpec.getTaskAttemptID());
       LOG.info("Initialized Output with dest edge: " + edgeName);
+      initializedOutputs.put(edgeName, output);
       return null;
     }
   }
@@ -664,6 +681,13 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     eventsToBeProcessed.addAll(events);
   }
 
+  @Override
+  public synchronized void abortTask() throws Exception {
+    if (processor != null) {
+      processor.abort();
+    }
+  }
+
   private void startRouterThread() {
     eventRouterThread = new Thread(new RunnableWithNdc() {
       public void runInternal() {
@@ -683,6 +707,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
             if (!isTaskDone()) {
               LOG.warn("Event Router thread interrupted. Returning.");
             }
+            Thread.currentThread().interrupt();
             return;
           }
         }
@@ -694,6 +719,12 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     eventRouterThread.start();
   }
 
+  private void maybeResetInterruptStatus() {
+    if (!Thread.currentThread().isInterrupted()) {
+      Thread.currentThread().interrupt();
+    }
+  }
+
   private void cleanupInputOutputs() {
     if (groupInputsMap != null) {
       groupInputsMap.clear();
@@ -726,6 +757,103 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   }
 
   public synchronized void cleanup() {
+
+    /**
+     * Cleanup IPO that are not closed.  In case, regular close() has happened in IPO, they
+     * would not be available in the IPOs to be cleaned. So this is safe.
+     *
+     * e.g whenever input gets closed() in normal way, it automatically removes it from
+     * initializedInputs map.
+     *
+     * In case any exception happens in processor close or IO close, it wouldn't be removed from
+     * the initialized IO data structures and here is the chance to close them and release
+     * resources.
+     *
+     */
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Processor closed={}", processorClosed);
+      LOG.debug("Num of inputs to be closed={}", initializedInputs.size());
+      LOG.debug("Num of outputs to be closed={}", initializedOutputs.size());
+    }
+    if (!processorClosed) {
+      try {
+        processorClosed = true;
+        processor.close();
+
+        LOG.info("Closed processor for vertex={}, index={}, interruptedStatus={}",
+            processor
+                .getContext().getTaskVertexName(),
+            processor.getContext().getTaskVertexIndex(),
+            Thread.currentThread().isInterrupted());
+
+        maybeResetInterruptStatus();
+      } catch (InterruptedException ie) {
+        //reset the status
+        LOG.info("Resetting interrupt for processor");
+        Thread.currentThread().interrupt();
+      } catch (Throwable e) {
+        LOG.warn("Exception when closing processor", e);
+      }
+    }
+    // Close the remaining inited Inputs.
+    Iterator<String> srcVertexItr = initializedInputs.keySet().iterator();
+    while (srcVertexItr.hasNext()) {
+      String srcVertexName = srcVertexItr.next();
+      try {
+        srcVertexItr.remove();
+
+        ((InputFrameworkInterface) initializedInputs.get(srcVertexName)).close();
+        initializedInputs.remove(srcVertexName);
+
+        maybeResetInterruptStatus();
+      } catch (InterruptedException ie) {
+        //reset the status
+        LOG.info("Resetting interrupt status for input with srcVertexName={}",
+            srcVertexName);
+        Thread.currentThread().interrupt();
+      } catch (Throwable e) {
+        LOG.warn("Exception when closing input in cleanup(interrupted)", e);
+      } finally {
+        LOG.info("Close input for vertex={}, sourceVertex={}, interruptedStatus={}", processor
+            .getContext().getTaskVertexName(), srcVertexName, Thread.currentThread()
+            .isInterrupted());
+      }
+    }
+
+    // Close the remaining inited Outputs.
+    try {
+      Iterator<String> outVertexItr = initializedOutputs.keySet().iterator();
+      while (outVertexItr.hasNext()) {
+        String destVertexName = outVertexItr.next();
+        try {
+          outVertexItr.remove();
+
+          ((OutputFrameworkInterface) initializedOutputs.get(destVertexName)).close();
+          initializedOutputs.remove(destVertexName);
+
+          maybeResetInterruptStatus();
+        } catch (InterruptedException ie) {
+          //reset the status
+          LOG.info("Resetting interrupt status for output with destVertexName={}",
+              destVertexName);
+          Thread.currentThread().interrupt();
+        } catch (Throwable e) {
+          LOG.warn("Exception when closing output in cleanup(interrupted)", e);
+        } finally {
+          LOG.info("Close input for vertex={}, sourceVertex={}, interruptedStatus={}", processor
+              .getContext().getTaskVertexName(), destVertexName, Thread.currentThread()
+              .isInterrupted());
+        }
+      }
+    } catch (Throwable e) {
+      LOG.warn(Throwables.getStackTraceAsString(e));
+    }
+
+    if (LOG.isDebugEnabled()) {
+      printThreads();
+    }
+
+
     try {
       cleanupInputOutputs();
       closeContexts();
@@ -739,6 +867,20 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
       eventRouterThread.interrupt();
     }
   }
+
+
+  /**
+   * Print all threads in JVM (only for debugging)
+   */
+  void printThreads() {
+    //Print the status of all threads in JVM
+    ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
+    long[] threadIds = threadMXBean.getAllThreadIds();
+    for (Long id : threadIds) {
+      ThreadInfo threadInfo = threadMXBean.getThreadInfo(id);
+      LOG.info("ThreadId : " + id + ", name=" + threadInfo.getThreadName());
+    }
+  }
   
   @Private
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/tez/blob/9338abfe/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
index f8b8621..162caf0 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
@@ -72,6 +72,10 @@ public abstract class RuntimeTask {
 
   protected final AtomicReference<State> state = new AtomicReference<State>();
 
+  public boolean isRunning() {
+    return (state.get() == State.RUNNING);
+  }
+
   public TezCounters addAndGetTezCounter(String name) {
     TezCounters counter = new TezCounters();
     counterMap.put(name, counter);
@@ -143,4 +147,5 @@ public abstract class RuntimeTask {
     taskDone.set(true);
   }
 
+  public abstract void abortTask() throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/9338abfe/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
index 33a7f4a..7238d5e 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
@@ -25,8 +25,13 @@ import java.security.PrivilegedExceptionAction;
 import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
+import com.google.common.base.Throwables;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSError;
@@ -35,6 +40,7 @@ import org.apache.tez.common.CallableWithNdc;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
+import org.apache.tez.runtime.RuntimeTask;
 import org.apache.tez.runtime.api.ExecutionContext;
 import org.apache.tez.runtime.api.ObjectRegistry;
 import org.apache.tez.runtime.api.impl.EventMetaData;
@@ -61,6 +67,7 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter {
   private final ListeningExecutorService executor;
   private volatile ListenableFuture<Void> taskFuture;
   private volatile Thread waitingThread;
+  private volatile Thread taskRunner;
   private volatile Throwable firstException;
 
   // Effectively a duplicate check, since hadFatalError does the same thing.
@@ -96,7 +103,10 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter {
     taskReporter.registerTask(task, this);
     TaskRunnerCallable callable = new TaskRunnerCallable();
     Throwable failureCause = null;
-    taskFuture = executor.submit(callable);
+    if (!Thread.currentThread().isInterrupted()) {
+      taskFuture = executor.submit(callable);
+      return isShutdownRequested();
+    }
     try {
       taskFuture.get();
 
@@ -158,6 +168,10 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter {
         }
       }
     }
+    return isShutdownRequested();
+  }
+
+  private boolean isShutdownRequested() {
     if (shutdownRequested.get()) {
       LOG.info("Shutdown requested... returning");
       return false;
@@ -173,11 +187,14 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter {
           @Override
           public Void run() throws Exception {
             try {
+              taskRunner = Thread.currentThread();
               LOG.info("Initializing task" + ", taskAttemptId=" + task.getTaskAttemptID());
               task.initialize();
               if (!Thread.currentThread().isInterrupted() && firstException == null) {
                 LOG.info("Running task, taskAttemptId=" + task.getTaskAttemptID());
                 task.run();
+                maybeInterruptWaitingThread();
+
                 LOG.info("Closing task, taskAttemptId=" + task.getTaskAttemptID());
                 task.close();
                 task.setFrameworkCounters();
@@ -199,6 +216,12 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter {
               }
               return null;
             } catch (Throwable cause) {
+              if (Thread.currentThread().isInterrupted()) {
+                LOG.info("TaskRunnerCallable interrupted=" + Thread.currentThread().isInterrupted()
+                    + ", shutdownRequest=" + shutdownRequested.get());
+                Thread.currentThread().interrupt();
+                return null;
+              }
               if (cause instanceof FSError) {
                 // Not immediately fatal, this is an error reported by Hadoop FileSystem
                 maybeRegisterFirstException(cause);
@@ -255,6 +278,17 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter {
         taskRunning.set(false);
       }
     }
+
+    private void maybeInterruptWaitingThread() {
+      /**
+       * Possible that the processor is swallowing InterruptException of taskRunner.interrupt().
+       * In such case, interrupt the waitingThread based on the shutdownRequested flag, so that
+       * entire task gets cancelled.
+       */
+      if (shutdownRequested.get()) {
+        waitingThread.interrupt();
+      }
+    }
   }
 
   // should wait until all messages are sent to AM before TezChild shutdown
@@ -353,10 +387,43 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter {
     }
   }
 
+  private void abortRunningTask() {
+    if (!taskRunning.get()) {
+      LOG.info("Task is not running");
+      waitingThread.interrupt();
+      return;
+    }
+
+    if (taskRunning.get()) {
+      try {
+        task.abortTask();
+      } catch (Exception e) {
+        LOG.warn("Error when aborting the task", e);
+        try {
+          sendFailure(e, "Error when aborting the task");
+        } catch (Exception ignored) {
+          // Ignored.
+        }
+      }
+    }
+    //Interrupt the relevant threads.  TaskRunner should be interrupted preferably.
+    if (isTaskRunning()) {
+      LOG.info("Interrupting taskRunner=" + taskRunner.getName());
+      taskRunner.interrupt();
+    } else {
+      LOG.info("Interrupting waitingThread=" + waitingThread.getName());
+      waitingThread.interrupt();
+    }
+  }
+
+  private boolean isTaskRunning() {
+    return (taskRunning.get() && task.isRunning());
+  }
+
   @Override
   public void shutdownRequested() {
     shutdownRequested.set(true);
-    waitingThread.interrupt();
+    abortRunningTask();
   }
 
   private String getTaskDiagnosticsString(Throwable t, String message) {