You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2016/01/15 17:58:50 UTC

tez git commit: TEZ-2937. Can Processor.close() be called after closing inputs and outputs? (jeagles)

Repository: tez
Updated Branches:
  refs/heads/master d0348b03b -> b0ba133ff


TEZ-2937. Can Processor.close() be called after closing inputs and outputs? (jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b0ba133f
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b0ba133f
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b0ba133f

Branch: refs/heads/master
Commit: b0ba133ffcf1339b2c505878e9622084d4290bde
Parents: d0348b0
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Fri Jan 15 10:58:37 2016 -0600
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Fri Jan 15 10:58:37 2016 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../runtime/LogicalIOProcessorRuntimeTask.java  | 53 ++++++++++----------
 2 files changed, 29 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/b0ba133f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d6a53aa..5226ea6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.8.3: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2937. Can Processor.close() be called after closing inputs and outputs?
   TEZ-3037. History URL should be set regardless of which history logging service is enabled.
   TEZ-3032. DAG start time getting logged using system time instead of recorded time in startTime field.
 
@@ -314,6 +315,7 @@ INCOMPATIBLE CHANGES
   TEZ-2949. Allow duplicate dag names within session for Tez.
 
 ALL CHANGES
+  TEZ-2937. Can Processor.close() be called after closing inputs and outputs?
   TEZ-3037. History URL should be set regardless of which history logging service is enabled.
   TEZ-3032. DAG start time getting logged using system time instead of recorded time in startTime field.
   TEZ-2129. Task and Attempt views should contain links to the logs

http://git-wip-us.apache.org/repos/asf/tez/blob/b0ba133f/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index df09fdb..7f546e6 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -357,10 +357,6 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
           "Can only run while in RUNNING state. Current: " + this.state);
       this.state.set(State.CLOSED);
 
-      // Close the Processor.
-      processorClosed = true;
-      processor.close();
-
       // Close the Inputs.
       for (InputSpec inputSpec : inputSpecs) {
         String srcVertexName = inputSpec.getSourceVertexName();
@@ -380,6 +376,11 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
             EventProducerConsumerType.OUTPUT, taskSpec.getVertexName(),
             destVertexName, taskSpec.getTaskAttemptID());
       }
+
+      // Close the Processor.
+      processorClosed = true;
+      processor.close();
+
     } finally {
       setTaskDone();
       if (eventRouterThread != null) {
@@ -843,28 +844,6 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
       LOG.debug("Num of outputs to be closed={}", initializedOutputs.size());
     }
 
-    // Close processor
-    if (!processorClosed && processor != null) {
-      try {
-        processorClosed = true;
-        processor.close();
-        LOG.info("Closed processor for vertex={}, index={}, interruptedStatus={}",
-            processor
-                .getContext().getTaskVertexName(),
-            processor.getContext().getTaskVertexIndex(),
-            Thread.currentThread().isInterrupted());
-        maybeResetInterruptStatus();
-      } catch (InterruptedException ie) {
-        //reset the status
-        LOG.info("Resetting interrupt for processor");
-        Thread.currentThread().interrupt();
-      } catch (Throwable e) {
-        LOG.warn(
-            "Ignoring Exception when closing processor(cleanup). Exception class={}, message={}" +
-                e.getClass().getName(), e.getMessage());
-      }
-    }
-
     // Close the remaining inited Inputs.
     Iterator<Map.Entry<String, LogicalInput>> inputIterator = initializedInputs.entrySet().iterator();
     while (inputIterator.hasNext()) {
@@ -917,6 +896,28 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
       printThreads();
     }
 
+    // Close processor
+    if (!processorClosed && processor != null) {
+      try {
+        processorClosed = true;
+        processor.close();
+        LOG.info("Closed processor for vertex={}, index={}, interruptedStatus={}",
+            processor
+            .getContext().getTaskVertexName(),
+            processor.getContext().getTaskVertexIndex(),
+            Thread.currentThread().isInterrupted());
+        maybeResetInterruptStatus();
+      } catch (InterruptedException ie) {
+        //reset the status
+        LOG.info("Resetting interrupt for processor");
+        Thread.currentThread().interrupt();
+      } catch (Throwable e) {
+        LOG.warn(
+            "Ignoring Exception when closing processor(cleanup). Exception class={}, message={}" +
+            e.getClass().getName(), e.getMessage());
+      }
+    }
+
     try {
       closeContexts();
       // Cleanup references which may be held by misbehaved tasks.