You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zj...@apache.org on 2015/05/28 03:45:30 UTC

tez git commit: TEZ-2483. Tez should close task if processor fail (zjffdu)

Repository: tez
Updated Branches:
  refs/heads/master fe74d6b9e -> 02aafb55e


TEZ-2483. Tez should close task if processor fail (zjffdu)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/02aafb55
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/02aafb55
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/02aafb55

Branch: refs/heads/master
Commit: 02aafb55e9b1adc3384adaccf44cba712a753fac
Parents: fe74d6b
Author: Jeff Zhang <zj...@apache.org>
Authored: Thu May 28 09:45:20 2015 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Thu May 28 09:45:20 2015 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../runtime/LogicalIOProcessorRuntimeTask.java  | 78 +++++++++++++++++++-
 2 files changed, 75 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/02aafb55/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f676ef7..c9318d2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -220,6 +220,7 @@ Release 0.6.2: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2483. Tez should close task if processor fail
 
 Release 0.6.1: 2015-05-18
 

http://git-wip-us.apache.org/repos/asf/tez/blob/02aafb55/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index 24f62a0..84e5e0d 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -114,6 +115,10 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   final List<GroupInputSpec> groupInputSpecs;
   ConcurrentHashMap<String, MergedLogicalInput> groupInputsMap;
 
+  final ConcurrentHashMap<String, LogicalInput> initializedInputs;
+  final ConcurrentHashMap<String, LogicalOutput> initializedOutputs;
+
+  private boolean processorClosed = false;
   final ProcessorDescriptor processorDescriptor;
   AbstractLogicalIOProcessor processor;
   ProcessorContext processorContext;
@@ -163,6 +168,9 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     this.outputsMap = new ConcurrentHashMap<String, LogicalOutput>(numOutputs);
     this.outputContextMap = new ConcurrentHashMap<String, OutputContext>(numOutputs);
 
+    this.initializedInputs = new ConcurrentHashMap<String, LogicalInput>();
+    this.initializedOutputs = new ConcurrentHashMap<String, LogicalOutput>();
+
     this.runInputMap = new LinkedHashMap<String, LogicalInput>();
     this.runOutputMap = new LinkedHashMap<String, LogicalOutput>();
 
@@ -344,11 +352,13 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
       this.state.set(State.CLOSED);
 
       // Close the Processor.
+      processorClosed = true;
       processor.close();
 
       // Close the Inputs.
       for (InputSpec inputSpec : inputSpecs) {
         String srcVertexName = inputSpec.getSourceVertexName();
+        initializedInputs.remove(srcVertexName);
         List<Event> closeInputEvents = ((InputFrameworkInterface)inputsMap.get(srcVertexName)).close();
         sendTaskGeneratedEvents(closeInputEvents,
             EventProducerConsumerType.INPUT, taskSpec.getVertexName(),
@@ -358,6 +368,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
       // Close the Outputs.
       for (OutputSpec outputSpec : outputSpecs) {
         String destVertexName = outputSpec.getDestinationVertexName();
+        initializedOutputs.remove(destVertexName);
         List<Event> closeOutputEvents = ((LogicalOutputFrameworkInterface)outputsMap.get(destVertexName)).close();
         sendTaskGeneratedEvents(closeOutputEvents,
             EventProducerConsumerType.OUTPUT, taskSpec.getVertexName(),
@@ -407,6 +418,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
       sendTaskGeneratedEvents(events, EventProducerConsumerType.INPUT,
           inputContext.getTaskVertexName(), inputContext.getSourceVertexName(),
           taskSpec.getTaskAttemptID());
+      initializedInputs.put(edgeName, input);
       LOG.info("Initialized Input with src edge: " + edgeName);
       return null;
     }
@@ -455,6 +467,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
       sendTaskGeneratedEvents(events, EventProducerConsumerType.OUTPUT,
           outputContext.getTaskVertexName(),
           outputContext.getDestinationVertexName(), taskSpec.getTaskAttemptID());
+      initializedOutputs.put(edgeName, output);
       LOG.info("Initialized Output with dest edge: " + edgeName);
       return null;
     }
@@ -748,6 +761,65 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
       }
       eventRouterThread = null;
     }
+
+    // Close the unclosed IPO
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Processor closed={}", processorClosed);
+      LOG.debug("Num of inputs to be closed={}", initializedInputs.size());
+      LOG.debug("Num of outputs to be closed={}", initializedOutputs.size());
+    }
+    // Close processor
+    if (!processorClosed && processor != null) {
+      try {
+        processorClosed = true;
+        processor.close();
+        LOG.info("Closed processor for vertex={}, index={}",
+            processor
+                .getContext().getTaskVertexName(),
+            processor.getContext().getTaskVertexIndex());
+      } catch (Throwable e) {
+        LOG.warn(
+            "Ignoring Exception when closing processor(cleanup). Exception class={}, message={}",
+                e.getClass().getName(), e.getMessage());
+      }
+    }
+
+    // Close the remaining inited Inputs.
+    Iterator<Map.Entry<String, LogicalInput>> inputIterator = initializedInputs.entrySet().iterator();
+    while (inputIterator.hasNext()) {
+      Map.Entry<String, LogicalInput> entry = inputIterator.next();
+      String srcVertexName = entry.getKey();
+      inputIterator.remove();
+      try {
+        ((InputFrameworkInterface)entry.getValue()).close();
+      } catch (Throwable e) {
+        LOG.warn(
+            "Ignoring exception when closing input {}(cleanup). Exception class={}, message={}",
+            srcVertexName, e.getClass().getName(), e.getMessage());
+      } finally {
+        LOG.info("Close input for vertex={}, sourceVertex={}", processor
+            .getContext().getTaskVertexName(), srcVertexName);
+      }
+    }
+
+    // Close the remaining inited Outputs.
+    Iterator<Map.Entry<String, LogicalOutput>> outputIterator = initializedOutputs.entrySet().iterator();
+    while (outputIterator.hasNext()) {
+      Map.Entry<String, LogicalOutput> entry = outputIterator.next();
+      String destVertexName = entry.getKey();
+      outputIterator.remove();
+      try {
+        ((OutputFrameworkInterface) entry.getValue()).close();
+      } catch (Throwable e) {
+        LOG.warn(
+            "Ignoring exception when closing output {}(cleanup). Exception class={}, message={}",
+            destVertexName, e.getClass().getName(), e.getMessage());
+      } finally {
+        LOG.info("Close input for vertex={}, sourceVertex={}", processor
+            .getContext().getTaskVertexName(), destVertexName);
+      }
+    }
+
     try {
       closeContexts();
       // Cleanup references which may be held by misbehaved tasks.
@@ -764,10 +836,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     inputsMap.clear();
     outputsMap.clear();
 
-
-
-    inputsMap.clear();
-    outputsMap.clear();
+    initializedInputs.clear();
+    initializedOutputs.clear();
 
     inputContextMap.clear();
     outputContextMap.clear();