You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/14 00:47:43 UTC

git commit: TEZ-442. Handle events generated by I/O initialize and close (part of TEZ-398). (sseth)

Updated Branches:
  refs/heads/TEZ-398 ca7abb76f -> 668728f90


TEZ-442. Handle events generated by I/O initialize and close (part of
TEZ-398). (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/668728f9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/668728f9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/668728f9

Branch: refs/heads/TEZ-398
Commit: 668728f906e434b7c2641b95efd7e18780ddd04f
Parents: ca7abb7
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Sep 13 15:47:15 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Sep 13 15:47:15 2013 -0700

----------------------------------------------------------------------
 .../LogicalIOProcessorRuntimeTask.java          | 85 +++++++++-----------
 1 file changed, 38 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/668728f9/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
index 390b0a7..f69ea2d 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.token.Token;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.engine.common.security.JobTokenIdentifier;
 import org.apache.tez.engine.newapi.Event;
 import org.apache.tez.engine.newapi.Input;
@@ -45,6 +46,8 @@ import org.apache.tez.engine.newapi.Processor;
 import org.apache.tez.engine.newapi.TezInputContext;
 import org.apache.tez.engine.newapi.TezOutputContext;
 import org.apache.tez.engine.newapi.TezProcessorContext;
+import org.apache.tez.engine.newapi.impl.EventMetaData;
+import org.apache.tez.engine.newapi.impl.EventMetaData.EventProducerConsumerType;
 import org.apache.tez.engine.newapi.impl.InputSpec;
 import org.apache.tez.engine.newapi.impl.OutputSpec;
 import org.apache.tez.engine.newapi.impl.TaskSpec;
@@ -81,14 +84,6 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   private Map<String, LogicalInput> inputMap;
   private Map<String, LogicalOutput> outputMap;
 
-  private Map<String, List<Event>> initInputEventMap;
-  private Map<String, List<Event>> initOutputEventMap;
-
-  private Map<String, List<Event>> closeInputEventMap;
-  private Map<String, List<Event>> closeOutputEventMap;
-
-
-
   public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec,
       Configuration tezConf, TezUmbilical tezUmbilical,
       Token<JobTokenIdentifier> jobToken) throws IOException {
@@ -116,28 +111,20 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     inputMap = new LinkedHashMap<String, LogicalInput>(inputs.size());
     outputMap = new LinkedHashMap<String, LogicalOutput>(outputs.size());
 
-    initInputEventMap = new LinkedHashMap<String, List<Event>>(inputs.size());
-    initOutputEventMap = new LinkedHashMap<String, List<Event>>(outputs.size());
-
     // TODO Maybe close initialized inputs / outputs in case of failure to
     // initialize.
     // Initialize all inputs. TODO: Multi-threaded at some point.
     for (int i = 0; i < inputs.size(); i++) {
       String srcVertexName = inputSpecs.get(i).getSourceVertexName();
-      List<Event> initInputEvents = initializeInput(inputs.get(i),
+      initializeInput(inputs.get(i),
           inputSpecs.get(i));
-      // TODO Add null/event list checking here or in the actual executor.
-      initInputEventMap.put(srcVertexName, initInputEvents);
       inputMap.put(srcVertexName, inputs.get(i));
     }
 
     // Initialize all outputs. TODO: Multi-threaded at some point.
     for (int i = 0; i < outputs.size(); i++) {
       String destVertexName = outputSpecs.get(i).getDestinationVertexName();
-      List<Event> initOutputEvents = initializeOutput(outputs.get(i),
-          outputSpecs.get(i));
-      // TODO Add null/event list checking here or in the actual executor.
-      initOutputEventMap.put(destVertexName, initOutputEvents);
+      initializeOutput(outputs.get(i), outputSpecs.get(i));
       outputMap.put(destVertexName, outputs.get(i));
     }
 
@@ -145,16 +132,6 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     initializeLogicalIOProcessor();
   }
 
-  public Map<String, List<Event>> getInputInitEvents() {
-    Preconditions.checkState(this.state != State.NEW, "Not initialized yet");
-    return initInputEventMap;
-  }
-
-  public Map<String, List<Event>> getOutputInitEvents() {
-    Preconditions.checkState(this.state != State.NEW, "Not initialized yet");
-    return initOutputEventMap;
-  }
-
   public void run() throws Exception {
     synchronized (this.state) {
       Preconditions.checkState(this.state == State.INITED,
@@ -168,15 +145,15 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   public void close() throws Exception {
     Preconditions.checkState(this.state == State.RUNNING,
         "Can only run while in RUNNING state. Current: " + this.state);
-    this.state=State.CLOSED;
-    closeInputEventMap = new LinkedHashMap<String, List<Event>>(inputs.size());
-    closeOutputEventMap = new LinkedHashMap<String, List<Event>>(outputs.size());
+    this.state = State.CLOSED;
 
     // Close the Inputs.
     for (int i = 0; i < inputs.size(); i++) {
       String srcVertexName = inputSpecs.get(i).getSourceVertexName();
       List<Event> closeInputEvents = inputs.get(i).close();
-      closeInputEventMap.put(srcVertexName, closeInputEvents);
+      sendTaskGeneratedEvents(closeInputEvents,
+          EventProducerConsumerType.INPUT, taskSpec.getVertexName(),
+          srcVertexName, taskSpec.getTaskAttemptID());
     }
 
     // Close the Processor.
@@ -186,38 +163,37 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     for (int i = 0; i < outputs.size(); i++) {
       String destVertexName = outputSpecs.get(i).getDestinationVertexName();
       List<Event> closeOutputEvents = outputs.get(i).close();
-      closeOutputEventMap.put(destVertexName, closeOutputEvents);
+      sendTaskGeneratedEvents(closeOutputEvents,
+          EventProducerConsumerType.OUTPUT, taskSpec.getVertexName(),
+          destVertexName, taskSpec.getTaskAttemptID());
     }
   }
 
-  public Map<String, List<Event>> getInputCloseEvents() {
-    Preconditions.checkState(this.state == State.CLOSED, "Not closed yet");
-    return closeInputEventMap;
-  }
-
-  public Map<String, List<Event>> getOutputCloseEvents() {
-    Preconditions.checkState(this.state == State.CLOSED, "Not closed yet");
-    return closeOutputEventMap;
-  }
-
-  private List<Event> initializeInput(Input input, InputSpec inputSpec)
+  private void initializeInput(Input input, InputSpec inputSpec)
       throws Exception {
     TezInputContext tezInputContext = createInputContext(inputSpec);
     if (input instanceof LogicalInput) {
       ((LogicalInput) input).setNumPhysicalInputs(inputSpec
           .getPhysicalEdgeCount());
     }
-    return input.initialize(tezInputContext);
+    List<Event> events = input.initialize(tezInputContext);
+    sendTaskGeneratedEvents(events, EventProducerConsumerType.INPUT,
+        tezInputContext.getTaskVertexName(),
+        tezInputContext.getSourceVertexName(), taskSpec.getTaskAttemptID());
   }
 
-  private List<Event> initializeOutput(Output output, OutputSpec outputSpec)
+  private void initializeOutput(Output output, OutputSpec outputSpec)
       throws Exception {
     TezOutputContext tezOutputContext = createOutputContext(outputSpec);
     if (output instanceof LogicalOutput) {
       ((LogicalOutput) output).setNumPhysicalOutputs(outputSpec
           .getPhysicalEdgeCount());
     }
-    return output.initialize(tezOutputContext);
+    List<Event> events = output.initialize(tezOutputContext);
+    sendTaskGeneratedEvents(events, EventProducerConsumerType.OUTPUT,
+        tezOutputContext.getTaskVertexName(),
+        tezOutputContext.getDestinationVertexName(),
+        taskSpec.getTaskAttemptID());
   }
 
   private void initializeLogicalIOProcessor() throws Exception {
@@ -299,6 +275,21 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     return (LogicalIOProcessor) processor;
   }
 
+  private void sendTaskGeneratedEvents(List<Event> events,
+      EventProducerConsumerType generator, String taskVertexName,
+      String edgeVertexName, TezTaskAttemptID taskAttemptID) {
+    if (events != null && events.size() > 0) {
+      EventMetaData eventMetaData = new EventMetaData(generator,
+          taskVertexName, edgeVertexName, taskAttemptID);
+      List<TezEvent> tezEvents = new ArrayList<TezEvent>(events.size());
+      for (Event e : events) {
+        TezEvent te = new TezEvent(e, eventMetaData);
+        tezEvents.add(te);
+      }
+      tezUmbilical.addEvents(tezEvents);
+    }
+  }
+
   public void handleEvent(TezEvent e) {
     switch (e.getDestinationInfo().getEventGenerator()) {
     case INPUT: