You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2016/01/15 17:58:50 UTC
tez git commit: TEZ-2937. Can Processor.close() be called after
closing inputs and outputs? (jeagles)
Repository: tez
Updated Branches:
refs/heads/master d0348b03b -> b0ba133ff
TEZ-2937. Can Processor.close() be called after closing inputs and outputs? (jeagles)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b0ba133f
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b0ba133f
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b0ba133f
Branch: refs/heads/master
Commit: b0ba133ffcf1339b2c505878e9622084d4290bde
Parents: d0348b0
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Fri Jan 15 10:58:37 2016 -0600
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Fri Jan 15 10:58:37 2016 -0600
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../runtime/LogicalIOProcessorRuntimeTask.java | 53 ++++++++++----------
2 files changed, 29 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/b0ba133f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d6a53aa..5226ea6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.8.3: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2937. Can Processor.close() be called after closing inputs and outputs?
TEZ-3037. History URL should be set regardless of which history logging service is enabled.
TEZ-3032. DAG start time getting logged using system time instead of recorded time in startTime field.
@@ -314,6 +315,7 @@ INCOMPATIBLE CHANGES
TEZ-2949. Allow duplicate dag names within session for Tez.
ALL CHANGES
+ TEZ-2937. Can Processor.close() be called after closing inputs and outputs?
TEZ-3037. History URL should be set regardless of which history logging service is enabled.
TEZ-3032. DAG start time getting logged using system time instead of recorded time in startTime field.
TEZ-2129. Task and Attempt views should contain links to the logs
http://git-wip-us.apache.org/repos/asf/tez/blob/b0ba133f/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index df09fdb..7f546e6 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -357,10 +357,6 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
"Can only run while in RUNNING state. Current: " + this.state);
this.state.set(State.CLOSED);
- // Close the Processor.
- processorClosed = true;
- processor.close();
-
// Close the Inputs.
for (InputSpec inputSpec : inputSpecs) {
String srcVertexName = inputSpec.getSourceVertexName();
@@ -380,6 +376,11 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
EventProducerConsumerType.OUTPUT, taskSpec.getVertexName(),
destVertexName, taskSpec.getTaskAttemptID());
}
+
+ // Close the Processor.
+ processorClosed = true;
+ processor.close();
+
} finally {
setTaskDone();
if (eventRouterThread != null) {
@@ -843,28 +844,6 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
LOG.debug("Num of outputs to be closed={}", initializedOutputs.size());
}
- // Close processor
- if (!processorClosed && processor != null) {
- try {
- processorClosed = true;
- processor.close();
- LOG.info("Closed processor for vertex={}, index={}, interruptedStatus={}",
- processor
- .getContext().getTaskVertexName(),
- processor.getContext().getTaskVertexIndex(),
- Thread.currentThread().isInterrupted());
- maybeResetInterruptStatus();
- } catch (InterruptedException ie) {
- //reset the status
- LOG.info("Resetting interrupt for processor");
- Thread.currentThread().interrupt();
- } catch (Throwable e) {
- LOG.warn(
- "Ignoring Exception when closing processor(cleanup). Exception class={}, message={}" +
- e.getClass().getName(), e.getMessage());
- }
- }
-
// Close the remaining inited Inputs.
Iterator<Map.Entry<String, LogicalInput>> inputIterator = initializedInputs.entrySet().iterator();
while (inputIterator.hasNext()) {
@@ -917,6 +896,28 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
printThreads();
}
+ // Close processor
+ if (!processorClosed && processor != null) {
+ try {
+ processorClosed = true;
+ processor.close();
+ LOG.info("Closed processor for vertex={}, index={}, interruptedStatus={}",
+ processor
+ .getContext().getTaskVertexName(),
+ processor.getContext().getTaskVertexIndex(),
+ Thread.currentThread().isInterrupted());
+ maybeResetInterruptStatus();
+ } catch (InterruptedException ie) {
+ //reset the status
+ LOG.info("Resetting interrupt for processor");
+ Thread.currentThread().interrupt();
+ } catch (Throwable e) {
+ LOG.warn(
+ "Ignoring Exception when closing processor(cleanup). Exception class={}, message={}" +
+ e.getClass().getName(), e.getMessage());
+ }
+ }
+
try {
closeContexts();
// Cleanup references which may be held by misbehaved tasks.