You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zj...@apache.org on 2015/05/28 03:45:30 UTC
tez git commit: TEZ-2483. Tez should close task if processor fail
(zjffdu)
Repository: tez
Updated Branches:
refs/heads/master fe74d6b9e -> 02aafb55e
TEZ-2483. Tez should close task if processor fail (zjffdu)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/02aafb55
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/02aafb55
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/02aafb55
Branch: refs/heads/master
Commit: 02aafb55e9b1adc3384adaccf44cba712a753fac
Parents: fe74d6b
Author: Jeff Zhang <zj...@apache.org>
Authored: Thu May 28 09:45:20 2015 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Thu May 28 09:45:20 2015 +0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../runtime/LogicalIOProcessorRuntimeTask.java | 78 +++++++++++++++++++-
2 files changed, 75 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/02aafb55/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f676ef7..c9318d2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -220,6 +220,7 @@ Release 0.6.2: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2483. Tez should close task if processor fail
Release 0.6.1: 2015-05-18
http://git-wip-us.apache.org/repos/asf/tez/blob/02aafb55/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index 24f62a0..84e5e0d 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collection;
import java.util.Collections;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -114,6 +115,10 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
final List<GroupInputSpec> groupInputSpecs;
ConcurrentHashMap<String, MergedLogicalInput> groupInputsMap;
+ final ConcurrentHashMap<String, LogicalInput> initializedInputs;
+ final ConcurrentHashMap<String, LogicalOutput> initializedOutputs;
+
+ private boolean processorClosed = false;
final ProcessorDescriptor processorDescriptor;
AbstractLogicalIOProcessor processor;
ProcessorContext processorContext;
@@ -163,6 +168,9 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
this.outputsMap = new ConcurrentHashMap<String, LogicalOutput>(numOutputs);
this.outputContextMap = new ConcurrentHashMap<String, OutputContext>(numOutputs);
+ this.initializedInputs = new ConcurrentHashMap<String, LogicalInput>();
+ this.initializedOutputs = new ConcurrentHashMap<String, LogicalOutput>();
+
this.runInputMap = new LinkedHashMap<String, LogicalInput>();
this.runOutputMap = new LinkedHashMap<String, LogicalOutput>();
@@ -344,11 +352,13 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
this.state.set(State.CLOSED);
// Close the Processor.
+ processorClosed = true;
processor.close();
// Close the Inputs.
for (InputSpec inputSpec : inputSpecs) {
String srcVertexName = inputSpec.getSourceVertexName();
+ initializedInputs.remove(srcVertexName);
List<Event> closeInputEvents = ((InputFrameworkInterface)inputsMap.get(srcVertexName)).close();
sendTaskGeneratedEvents(closeInputEvents,
EventProducerConsumerType.INPUT, taskSpec.getVertexName(),
@@ -358,6 +368,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
// Close the Outputs.
for (OutputSpec outputSpec : outputSpecs) {
String destVertexName = outputSpec.getDestinationVertexName();
+ initializedOutputs.remove(destVertexName);
List<Event> closeOutputEvents = ((LogicalOutputFrameworkInterface)outputsMap.get(destVertexName)).close();
sendTaskGeneratedEvents(closeOutputEvents,
EventProducerConsumerType.OUTPUT, taskSpec.getVertexName(),
@@ -407,6 +418,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
sendTaskGeneratedEvents(events, EventProducerConsumerType.INPUT,
inputContext.getTaskVertexName(), inputContext.getSourceVertexName(),
taskSpec.getTaskAttemptID());
+ initializedInputs.put(edgeName, input);
LOG.info("Initialized Input with src edge: " + edgeName);
return null;
}
@@ -455,6 +467,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
sendTaskGeneratedEvents(events, EventProducerConsumerType.OUTPUT,
outputContext.getTaskVertexName(),
outputContext.getDestinationVertexName(), taskSpec.getTaskAttemptID());
+ initializedOutputs.put(edgeName, output);
LOG.info("Initialized Output with dest edge: " + edgeName);
return null;
}
@@ -748,6 +761,65 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
}
eventRouterThread = null;
}
+
+ // Close the unclosed IPO
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processor closed={}", processorClosed);
+ LOG.debug("Num of inputs to be closed={}", initializedInputs.size());
+ LOG.debug("Num of outputs to be closed={}", initializedOutputs.size());
+ }
+ // Close processor
+ if (!processorClosed && processor != null) {
+ try {
+ processorClosed = true;
+ processor.close();
+ LOG.info("Closed processor for vertex={}, index={}",
+ processor
+ .getContext().getTaskVertexName(),
+ processor.getContext().getTaskVertexIndex());
+ } catch (Throwable e) {
+ LOG.warn(
+ "Ignoring Exception when closing processor(cleanup). Exception class={}, message={}",
+ e.getClass().getName(), e.getMessage());
+ }
+ }
+
+ // Close the remaining inited Inputs.
+ Iterator<Map.Entry<String, LogicalInput>> inputIterator = initializedInputs.entrySet().iterator();
+ while (inputIterator.hasNext()) {
+ Map.Entry<String, LogicalInput> entry = inputIterator.next();
+ String srcVertexName = entry.getKey();
+ inputIterator.remove();
+ try {
+ ((InputFrameworkInterface)entry.getValue()).close();
+ } catch (Throwable e) {
+ LOG.warn(
+ "Ignoring exception when closing input {}(cleanup). Exception class={}, message={}",
+ srcVertexName, e.getClass().getName(), e.getMessage());
+ } finally {
+ LOG.info("Close input for vertex={}, sourceVertex={}", processor
+ .getContext().getTaskVertexName(), srcVertexName);
+ }
+ }
+
+ // Close the remaining inited Outputs.
+ Iterator<Map.Entry<String, LogicalOutput>> outputIterator = initializedOutputs.entrySet().iterator();
+ while (outputIterator.hasNext()) {
+ Map.Entry<String, LogicalOutput> entry = outputIterator.next();
+ String destVertexName = entry.getKey();
+ outputIterator.remove();
+ try {
+ ((OutputFrameworkInterface) entry.getValue()).close();
+ } catch (Throwable e) {
+ LOG.warn(
+ "Ignoring exception when closing output {}(cleanup). Exception class={}, message={}",
+ destVertexName, e.getClass().getName(), e.getMessage());
+ } finally {
+ LOG.info("Close input for vertex={}, sourceVertex={}", processor
+ .getContext().getTaskVertexName(), destVertexName);
+ }
+ }
+
try {
closeContexts();
// Cleanup references which may be held by misbehaved tasks.
@@ -764,10 +836,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
inputsMap.clear();
outputsMap.clear();
-
-
- inputsMap.clear();
- outputsMap.clear();
+ initializedInputs.clear();
+ initializedOutputs.clear();
inputContextMap.clear();
outputContextMap.clear();