You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/14 00:47:43 UTC
git commit: TEZ-442. Handle events generated by I/O initialize and
close (part of TEZ-398). (sseth)
Updated Branches:
refs/heads/TEZ-398 ca7abb76f -> 668728f90
TEZ-442. Handle events generated by I/O initialize and close (part of
TEZ-398). (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/668728f9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/668728f9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/668728f9
Branch: refs/heads/TEZ-398
Commit: 668728f906e434b7c2641b95efd7e18780ddd04f
Parents: ca7abb7
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Sep 13 15:47:15 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Sep 13 15:47:15 2013 -0700
----------------------------------------------------------------------
.../LogicalIOProcessorRuntimeTask.java | 85 +++++++++-----------
1 file changed, 38 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/668728f9/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
index 390b0a7..f69ea2d 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.Token;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.engine.common.security.JobTokenIdentifier;
import org.apache.tez.engine.newapi.Event;
import org.apache.tez.engine.newapi.Input;
@@ -45,6 +46,8 @@ import org.apache.tez.engine.newapi.Processor;
import org.apache.tez.engine.newapi.TezInputContext;
import org.apache.tez.engine.newapi.TezOutputContext;
import org.apache.tez.engine.newapi.TezProcessorContext;
+import org.apache.tez.engine.newapi.impl.EventMetaData;
+import org.apache.tez.engine.newapi.impl.EventMetaData.EventProducerConsumerType;
import org.apache.tez.engine.newapi.impl.InputSpec;
import org.apache.tez.engine.newapi.impl.OutputSpec;
import org.apache.tez.engine.newapi.impl.TaskSpec;
@@ -81,14 +84,6 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
private Map<String, LogicalInput> inputMap;
private Map<String, LogicalOutput> outputMap;
- private Map<String, List<Event>> initInputEventMap;
- private Map<String, List<Event>> initOutputEventMap;
-
- private Map<String, List<Event>> closeInputEventMap;
- private Map<String, List<Event>> closeOutputEventMap;
-
-
-
public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec,
Configuration tezConf, TezUmbilical tezUmbilical,
Token<JobTokenIdentifier> jobToken) throws IOException {
@@ -116,28 +111,20 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
inputMap = new LinkedHashMap<String, LogicalInput>(inputs.size());
outputMap = new LinkedHashMap<String, LogicalOutput>(outputs.size());
- initInputEventMap = new LinkedHashMap<String, List<Event>>(inputs.size());
- initOutputEventMap = new LinkedHashMap<String, List<Event>>(outputs.size());
-
// TODO Maybe close initialized inputs / outputs in case of failure to
// initialize.
// Initialize all inputs. TODO: Multi-threaded at some point.
for (int i = 0; i < inputs.size(); i++) {
String srcVertexName = inputSpecs.get(i).getSourceVertexName();
- List<Event> initInputEvents = initializeInput(inputs.get(i),
+ initializeInput(inputs.get(i),
inputSpecs.get(i));
- // TODO Add null/event list checking here or in the actual executor.
- initInputEventMap.put(srcVertexName, initInputEvents);
inputMap.put(srcVertexName, inputs.get(i));
}
// Initialize all outputs. TODO: Multi-threaded at some point.
for (int i = 0; i < outputs.size(); i++) {
String destVertexName = outputSpecs.get(i).getDestinationVertexName();
- List<Event> initOutputEvents = initializeOutput(outputs.get(i),
- outputSpecs.get(i));
- // TODO Add null/event list checking here or in the actual executor.
- initOutputEventMap.put(destVertexName, initOutputEvents);
+ initializeOutput(outputs.get(i), outputSpecs.get(i));
outputMap.put(destVertexName, outputs.get(i));
}
@@ -145,16 +132,6 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
initializeLogicalIOProcessor();
}
- public Map<String, List<Event>> getInputInitEvents() {
- Preconditions.checkState(this.state != State.NEW, "Not initialized yet");
- return initInputEventMap;
- }
-
- public Map<String, List<Event>> getOutputInitEvents() {
- Preconditions.checkState(this.state != State.NEW, "Not initialized yet");
- return initOutputEventMap;
- }
-
public void run() throws Exception {
synchronized (this.state) {
Preconditions.checkState(this.state == State.INITED,
@@ -168,15 +145,15 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
public void close() throws Exception {
Preconditions.checkState(this.state == State.RUNNING,
"Can only run while in RUNNING state. Current: " + this.state);
- this.state=State.CLOSED;
- closeInputEventMap = new LinkedHashMap<String, List<Event>>(inputs.size());
- closeOutputEventMap = new LinkedHashMap<String, List<Event>>(outputs.size());
+ this.state = State.CLOSED;
// Close the Inputs.
for (int i = 0; i < inputs.size(); i++) {
String srcVertexName = inputSpecs.get(i).getSourceVertexName();
List<Event> closeInputEvents = inputs.get(i).close();
- closeInputEventMap.put(srcVertexName, closeInputEvents);
+ sendTaskGeneratedEvents(closeInputEvents,
+ EventProducerConsumerType.INPUT, taskSpec.getVertexName(),
+ srcVertexName, taskSpec.getTaskAttemptID());
}
// Close the Processor.
@@ -186,38 +163,37 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
for (int i = 0; i < outputs.size(); i++) {
String destVertexName = outputSpecs.get(i).getDestinationVertexName();
List<Event> closeOutputEvents = outputs.get(i).close();
- closeOutputEventMap.put(destVertexName, closeOutputEvents);
+ sendTaskGeneratedEvents(closeOutputEvents,
+ EventProducerConsumerType.OUTPUT, taskSpec.getVertexName(),
+ destVertexName, taskSpec.getTaskAttemptID());
}
}
- public Map<String, List<Event>> getInputCloseEvents() {
- Preconditions.checkState(this.state == State.CLOSED, "Not closed yet");
- return closeInputEventMap;
- }
-
- public Map<String, List<Event>> getOutputCloseEvents() {
- Preconditions.checkState(this.state == State.CLOSED, "Not closed yet");
- return closeOutputEventMap;
- }
-
- private List<Event> initializeInput(Input input, InputSpec inputSpec)
+ private void initializeInput(Input input, InputSpec inputSpec)
throws Exception {
TezInputContext tezInputContext = createInputContext(inputSpec);
if (input instanceof LogicalInput) {
((LogicalInput) input).setNumPhysicalInputs(inputSpec
.getPhysicalEdgeCount());
}
- return input.initialize(tezInputContext);
+ List<Event> events = input.initialize(tezInputContext);
+ sendTaskGeneratedEvents(events, EventProducerConsumerType.INPUT,
+ tezInputContext.getTaskVertexName(),
+ tezInputContext.getSourceVertexName(), taskSpec.getTaskAttemptID());
}
- private List<Event> initializeOutput(Output output, OutputSpec outputSpec)
+ private void initializeOutput(Output output, OutputSpec outputSpec)
throws Exception {
TezOutputContext tezOutputContext = createOutputContext(outputSpec);
if (output instanceof LogicalOutput) {
((LogicalOutput) output).setNumPhysicalOutputs(outputSpec
.getPhysicalEdgeCount());
}
- return output.initialize(tezOutputContext);
+ List<Event> events = output.initialize(tezOutputContext);
+ sendTaskGeneratedEvents(events, EventProducerConsumerType.OUTPUT,
+ tezOutputContext.getTaskVertexName(),
+ tezOutputContext.getDestinationVertexName(),
+ taskSpec.getTaskAttemptID());
}
private void initializeLogicalIOProcessor() throws Exception {
@@ -299,6 +275,21 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
return (LogicalIOProcessor) processor;
}
+ private void sendTaskGeneratedEvents(List<Event> events,
+ EventProducerConsumerType generator, String taskVertexName,
+ String edgeVertexName, TezTaskAttemptID taskAttemptID) {
+ if (events != null && events.size() > 0) {
+ EventMetaData eventMetaData = new EventMetaData(generator,
+ taskVertexName, edgeVertexName, taskAttemptID);
+ List<TezEvent> tezEvents = new ArrayList<TezEvent>(events.size());
+ for (Event e : events) {
+ TezEvent te = new TezEvent(e, eventMetaData);
+ tezEvents.add(te);
+ }
+ tezUmbilical.addEvents(tezEvents);
+ }
+ }
+
public void handleEvent(TezEvent e) {
switch (e.getDestinationInfo().getEventGenerator()) {
case INPUT: