You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/02/18 04:10:19 UTC

git commit: TEZ-668. Allow Processors to control Input / Output start. (sseth)

Repository: incubator-tez
Updated Branches:
  refs/heads/master 41bbbb3b0 -> e5ee79198


TEZ-668. Allow Processors to control Input / Output start. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/e5ee7919
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/e5ee7919
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/e5ee7919

Branch: refs/heads/master
Commit: e5ee791982805972829bca58dd05c76d9bc792db
Parents: 41bbbb3
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Feb 17 19:09:51 2014 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Feb 17 19:09:51 2014 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../java/org/apache/tez/runtime/api/Input.java  |  18 +-
 .../tez/runtime/api/MergedLogicalInput.java     |  14 +-
 .../java/org/apache/tez/runtime/api/Output.java |  17 +-
 .../apache/hadoop/mapred/YarnTezDagChild.java   |  12 +-
 .../mapreduce/examples/OrderedWordCount.java    |   4 -
 .../tez/mapreduce/examples/UnionExample.java    |  12 +
 .../tez/mapreduce/examples/WordCount.java       |  14 ++
 .../processor/FilterByWordInputProcessor.java   |   9 +-
 .../processor/FilterByWordOutputProcessor.java  |   9 +-
 .../org/apache/tez/mapreduce/input/MRInput.java |   4 +-
 .../apache/tez/mapreduce/output/MROutput.java   |   3 +-
 .../mapreduce/processor/map/MapProcessor.java   |   7 +-
 .../processor/reduce/ReduceProcessor.java       |   7 +
 .../tez/mapreduce/processor/MapUtils.java       |   6 +-
 .../processor/reduce/TestReduceProcessor.java   |   5 +-
 .../runtime/LogicalIOProcessorRuntimeTask.java  |  78 +++---
 .../TestLogicalIOProcessorRuntimeTask.java      | 245 +++++++++++++++++++
 .../runtime/library/input/LocalMergedInput.java |   3 +-
 .../library/input/ShuffledMergedInput.java      |  12 +-
 .../library/input/ShuffledUnorderedKVInput.java |  12 +-
 .../library/output/OnFileSortedOutput.java      |   9 +-
 .../library/output/OnFileUnorderedKVOutput.java |   3 +-
 .../library/processor/SleepProcessor.java       |   6 +
 .../java/org/apache/tez/test/TestInput.java     |   3 +-
 .../java/org/apache/tez/test/TestOutput.java    |   3 +-
 .../java/org/apache/tez/test/TestProcessor.java |   8 +
 27 files changed, 423 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 15a0711..86931be 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -14,6 +14,8 @@ INCOMPATIBLE CHANGES
 
   TEZ-827. Separate initialize and start operations on Inputs/Outputs.
 
+  TEZ-668. Allow Processors to control Input/Output start
+
 Release 0.2.0 - 2013-11-30
 
   First version.

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-api/src/main/java/org/apache/tez/runtime/api/Input.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/Input.java b/tez-api/src/main/java/org/apache/tez/runtime/api/Input.java
index 4afefd1..27f66d0 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/Input.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/Input.java
@@ -43,20 +43,22 @@ public interface Input {
       throws Exception;
 
   /**
-   * Start any processing that the Input may need to perform. This, for now, is
-   * always invoked by the framework.
+   * Start any processing that the Input may need to perform. It is the
+   * responsibility of the Processor to start Inputs.
    * 
-   * The implementation of Input is expected to be non blocking. Inputs should
-   * see this as a signal to start processing, but must return control to the
-   * framework before the Processor actually starts.
+   * This typically acts as a signal to Inputs to start any Processing that they
+   * may required. A blocking implementation of this method should not be used
+   * as a mechanism to determine when an Input is actually ready.
    * 
-   * Inputs should be written to handle multiple start invocations - typically
+   * This method may be invoked by the framework under certain circumstances,
+   * and as such requires the implementation to be non-blocking.
+   * 
+   * Inputs must be written to handle multiple start invocations - typically
    * honoring only the first one.
    * 
-   * @return list of events that were generated during start
    * @throws Exception
    */
-  public List<Event> start() throws Exception;
+  public void start() throws Exception;
   
   /**
    * Gets an instance of the {@link Reader} for this <code>Output</code>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
index 405e0d8..1e82aca 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
@@ -19,7 +19,7 @@
 package org.apache.tez.runtime.api;
 
 import java.util.List;
-import com.google.common.collect.Lists;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * A LogicalInput that is used to merge the data from multiple inputs and provide a 
@@ -30,6 +30,7 @@ import com.google.common.collect.Lists;
 public abstract class MergedLogicalInput implements LogicalInput {
 
   private List<Input> inputs;
+  private final AtomicBoolean isStarted = new AtomicBoolean(false);
   
   public final void initialize(List<Input> inputs) {
     this.inputs = inputs;
@@ -45,15 +46,12 @@ public abstract class MergedLogicalInput implements LogicalInput {
   }
 
   @Override
-  public List<Event> start() throws Exception {
-    List<Event> events = Lists.newLinkedList();
-    for (Input input : inputs) {
-      List<Event> inputEvents = input.start();
-      if (inputEvents != null) {
-        events.addAll(inputEvents);
+  public final void start() throws Exception {
+    if (!isStarted.getAndSet(true)) {
+      for (Input input : inputs) {
+        input.start();
       }
     }
-    return events;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-api/src/main/java/org/apache/tez/runtime/api/Output.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/Output.java b/tez-api/src/main/java/org/apache/tez/runtime/api/Output.java
index cfff7a1..e986c34 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/Output.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/Output.java
@@ -43,20 +43,21 @@ public interface Output {
       throws Exception;
 
   /**
-   * Start any processing that the Output may need to perform. This, for now, is
-   * always invoked by the framework.
+   * Start any processing that the Output may need to perform. It is the
+   * responsibility of the Processor to start Outputs.
    * 
-   * The implementation of Output is expected to be non blocking. Outputs should
-   * see this as a signal to start any processing that may be required, but must
-   * return control to the framework before the Processor actually starts.
+   * This typically acts as a signal to Outputs to start any Processing that they
+   * may required.
    * 
-   * Outputs should be written to handle multiple start invocations - typically
+   * This method may be invoked by the framework under certain circumstances,
+   * and as such requires the implementation to be non-blocking.   
+   * 
+   * Outputs must be written to handle multiple start invocations - typically
    * honoring only the first one.
    * 
-   * @return list of events that were generated during initialization
    * @throws Exception
    */
-  public List<Event> start() throws Exception;
+  public void start() throws Exception;
 
   /**
    * Gets an instance of the {@link Writer} in an <code>Output</code>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index 0e0d036..1288369 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -90,7 +90,9 @@ import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
 import org.apache.tez.runtime.common.objectregistry.ObjectRegistryModule;
 import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
 
+import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 
@@ -118,6 +120,13 @@ public class YarnTezDagChild {
   private static Throwable heartbeatErrorException = null;
   // Implies that the task is done - and the AM is being informed.
   private static AtomicBoolean currentTaskComplete = new AtomicBoolean(true);
+  /**
+   * Used to maintain information about which Inputs have been started by the
+   * framework for the specific DAG. Makes an assumption that multiple DAGs do
+   * not execute concurrently, and must be reset each time the running DAG
+   * changes.
+   */
+  private static Multimap<String, String> startedInputsMap = HashMultimap.create();
 
   private static Thread startHeartbeatThread() {
     Thread heartbeatThread = new Thread(new Runnable() {
@@ -483,6 +492,7 @@ public class YarnTezDagChild {
             }
             if (!lastVertexId.getDAGId().equals(newVertexId.getDAGId())) {
               objectRegistry.clearCache(ObjectLifeCycle.DAG);
+              startedInputsMap = HashMultimap.create();
             }
           }
           lastVertexId = newVertexId;
@@ -643,7 +653,7 @@ public class YarnTezDagChild {
     LOG.info("LocalDirs for child: " + Arrays.toString(localDirs));
 
     return new LogicalIOProcessorRuntimeTask(taskSpec, attemptNum, conf,
-        tezUmbilical, serviceConsumerMetadata);
+        tezUmbilical, serviceConsumerMetadata, startedInputsMap);
   }
   
   // TODONEWTEZ Is this really required ?

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
index 3b5d573..c418bb2 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
@@ -18,7 +18,6 @@
 
 package org.apache.tez.mapreduce.examples;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.EnumSet;
@@ -50,11 +49,8 @@ import org.apache.hadoop.mapreduce.split.TezGroupedSplitsInputFormat;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.util.Apps;
 import org.apache.tez.client.AMConfiguration;
 import org.apache.tez.client.PreWarmContext;
 import org.apache.tez.client.TezClient;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
index 8a790fd..58443e8 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
@@ -116,6 +116,12 @@ public class UnionExample {
     public void run(Map<String, LogicalInput> inputs,
         Map<String, LogicalOutput> outputs) throws Exception {
       Preconditions.checkArgument(inputs.size() == 1);
+      for (LogicalInput input : inputs.values()) {
+        input.start();
+      }
+      for (LogicalOutput output : outputs.values()) {
+        output.start();
+      }
       boolean inUnion = true;
       if (context.getTaskVertexName().equals("map3")) {
         inUnion = false;
@@ -177,6 +183,12 @@ public class UnionExample {
         Map<String, LogicalOutput> outputs) throws Exception {
       Preconditions.checkArgument(inputs.size() == 2);
       Preconditions.checkArgument(outputs.size() == 2);
+      for (LogicalInput input : inputs.values()) {
+        input.start();
+      }
+      for (LogicalOutput output : outputs.values()) {
+        output.start();
+      }
       MROutput out = (MROutput) outputs.get("union");
       MROutput allParts = (MROutput) outputs.get("all-parts");
       KeyValueWriter kvWriter = out.getWriter();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
index 523514d..de4cc6c 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
@@ -108,6 +108,12 @@ public class WordCount {
     @Override
     public void run(Map<String, LogicalInput> inputs,
         Map<String, LogicalOutput> outputs) throws Exception {
+      for (LogicalInput input : inputs.values()) {
+        input.start();
+      }
+      for (LogicalOutput output : outputs.values()) {
+        output.start();
+      }
       Preconditions.checkArgument(inputs.size() == 1);
       Preconditions.checkArgument(outputs.size() == 1);
       MRInput input = (MRInput) inputs.values().iterator().next();
@@ -146,6 +152,14 @@ public class WordCount {
     public void run(Map<String, LogicalInput> inputs,
         Map<String, LogicalOutput> outputs) throws Exception {
       Preconditions.checkArgument(inputs.size() == 1);
+
+      for (LogicalInput input : inputs.values()) {
+        input.start();
+      }
+      for (LogicalOutput output : outputs.values()) {
+        output.start();
+      }
+
       MROutput out = (MROutput) outputs.values().iterator().next();
       KeyValueWriter kvWriter = out.getWriter();
       KeyValuesReader kvReader = (KeyValuesReader) inputs.values().iterator().next().getReader();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
index 04dccb0..1d78366 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
@@ -73,7 +73,7 @@ public class FilterByWordInputProcessor implements LogicalIOProcessor {
   @Override
   public void run(Map<String, LogicalInput> inputs,
       Map<String, LogicalOutput> outputs) throws Exception {
-
+    
     if (inputs.size() != 1) {
       throw new IllegalStateException("FilterByWordInputProcessor processor can only work with a single input");
     }
@@ -81,6 +81,13 @@ public class FilterByWordInputProcessor implements LogicalIOProcessor {
     if (outputs.size() != 1) {
       throw new IllegalStateException("FilterByWordInputProcessor processor can only work with a single output");
     }
+    
+    for (LogicalInput input : inputs.values()) {
+      input.start();
+    }
+    for (LogicalOutput output : outputs.values()) {
+      output.start();
+    }
 
     LogicalInput li = inputs.values().iterator().next();
     if (! (li instanceof MRInput)) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java
index 7355ab7..d061ea0 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java
@@ -61,7 +61,7 @@ public class FilterByWordOutputProcessor implements LogicalIOProcessor {
   @Override
   public void run(Map<String, LogicalInput> inputs,
       Map<String, LogicalOutput> outputs) throws Exception {
-
+    
     if (inputs.size() != 1) {
       throw new IllegalStateException("FilterByWordOutputProcessor processor can only work with a single input");
     }
@@ -70,6 +70,13 @@ public class FilterByWordOutputProcessor implements LogicalIOProcessor {
       throw new IllegalStateException("FilterByWordOutputProcessor processor can only work with a single output");
     }
 
+    for (LogicalInput input : inputs.values()) {
+      input.start();
+    }
+    for (LogicalOutput output : outputs.values()) {
+      output.start();
+    }
+
     LogicalInput li = inputs.values().iterator().next();
     if (! (li instanceof ShuffledUnorderedKVInput)) {
       throw new IllegalStateException("FilterByWordOutputProcessor processor can only work with ShuffledUnorderedKVInput");

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index 7108d37..edd8e92 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -18,7 +18,6 @@
 package org.apache.tez.mapreduce.input;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
@@ -159,8 +158,7 @@ public class MRInput implements LogicalInput {
   }
   
   @Override
-  public List<Event> start() {
-    return Collections.emptyList();
+  public void start() {
   }
 
   @Private

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index 2d7b48a..f9405f9 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -191,8 +191,7 @@ public class MROutput implements LogicalOutput {
   }
   
   @Override
-  public List<Event> start() {
-    return null;
+  public void start() {
   }
 
   public void initCommitter(JobConf job, boolean useNewApi)

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
index 00a054e..b90cd11 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.mapred.MapRunnable;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.split.TezGroupedSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -91,6 +90,12 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor {
       Map<String, LogicalOutput> outputs) throws Exception {
 
     LOG.info("Running map: " + processorContext.getUniqueIdentifier());
+    for (LogicalInput input : inputs.values()) {
+      input.start();
+    }
+    for (LogicalOutput output : outputs.values()) {
+      output.start();
+    }
 
     if (inputs.size() != 1
         || outputs.size() != 1) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
index bf4f660..f98e34f 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
@@ -94,6 +94,13 @@ implements LogicalIOProcessor {
       Map<String, LogicalOutput> outputs) throws Exception {
 
     LOG.info("Running reduce: " + processorContext.getUniqueIdentifier());
+    
+    for (LogicalInput input : inputs.values()) {
+      input.start();
+    }
+    for (LogicalOutput output : outputs.values()) {
+      output.start();
+    }
 
     if (outputs.size() <= 0 || outputs.size() > 1) {
       throw new IOException("Invalid number of outputs"

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
index e0a4c68..700acb0 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
@@ -62,6 +62,9 @@ import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.api.impl.TezUmbilical;
 import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
 
+import com.google.common.collect.HashMultimap;
+
+
 public class MapUtils {
 
   private static final Log LOG = LogFactory.getLog(MapUtils.class);
@@ -216,7 +219,8 @@ public class MapUtils {
         0,
         jobConf,
         umbilical,
-        serviceConsumerMetadata);
+        serviceConsumerMetadata,
+        HashMultimap.<String, String>create());
     return task;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index 7868ecc..5a2a83e 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -70,6 +70,8 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import com.google.common.collect.HashMultimap;
+
 
 @SuppressWarnings("deprecation")
 public class TestReduceProcessor {
@@ -184,7 +186,8 @@ public class TestReduceProcessor {
         0,
         reduceConf,
         new TestUmbilical(),
-        serviceConsumerMetadata);
+        serviceConsumerMetadata,
+        HashMultimap.<String, String>create());
     
     task.initialize();
     task.run();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index 21cc810..f5b414d 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -57,8 +57,9 @@ import org.apache.tez.runtime.api.Processor;
 import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.api.TezOutputContext;
 import org.apache.tez.runtime.api.TezProcessorContext;
-import org.apache.tez.runtime.api.impl.GroupInputSpec;
 import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
+import org.apache.tez.runtime.api.impl.GroupInputSpec;
 import org.apache.tez.runtime.api.impl.InputSpec;
 import org.apache.tez.runtime.api.impl.OutputSpec;
 import org.apache.tez.runtime.api.impl.TaskSpec;
@@ -67,12 +68,12 @@ import org.apache.tez.runtime.api.impl.TezInputContextImpl;
 import org.apache.tez.runtime.api.impl.TezOutputContextImpl;
 import org.apache.tez.runtime.api.impl.TezProcessorContextImpl;
 import org.apache.tez.runtime.api.impl.TezUmbilical;
-import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
 import org.apache.tez.runtime.common.resources.MemoryDistributor;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
@@ -108,6 +109,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   
   private final ExecutorService initializerExecutor;
   private final CompletionService<Void> initializerCompletionService;
+  
+  private final Multimap<String, String> startedInputsMap;
 
   private LinkedBlockingQueue<TezEvent> eventsToBeProcessed;
   private Thread eventRouterThread = null;
@@ -116,7 +119,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
 
   public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec, int appAttemptNumber,
       Configuration tezConf, TezUmbilical tezUmbilical,
-      Map<String, ByteBuffer> serviceConsumerMetadata) throws IOException {
+      Map<String, ByteBuffer> serviceConsumerMetadata,
+      Multimap<String, String> startedInputsMap) throws IOException {
     // TODO Remove jobToken from here post TEZ-421
     super(taskSpec, tezConf, tezUmbilical);
     LOG.info("Initializing LogicalIOProcessorRuntimeTask with TaskSpec: "
@@ -149,6 +153,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
         this.initializerExecutor);
     this.groupInputSpecs = taskSpec.getGroupInputs();
     initialMemoryDistributor = new MemoryDistributor(numInputs, numOutputs, tezConf);
+    this.startedInputsMap = startedInputsMap;
   }
 
   /**
@@ -220,24 +225,26 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
             + " for start, since it will be controlled via it's Group");
         continue;
       }
-      numAutoStarts++;
-      this.initializerCompletionService.submit(new StartInputCallable(inputsMap.get(inputSpec
-          .getSourceVertexName()), inputSpec.getSourceVertexName(), taskSpec.getVertexName()));
-    }
-
-    if (groupInputSpecs != null) {
-      for (GroupInputSpec group : groupInputSpecs) {
+      if (!inputAlreadyStarted(taskSpec.getVertexName(), inputSpec.getSourceVertexName())) {
+        startedInputsMap.put(taskSpec.getVertexName(), inputSpec.getSourceVertexName());
         numAutoStarts++;
-        this.initializerCompletionService.submit(new StartInputCallable(groupInputsMap.get(group
-            .getGroupName()), group.getGroupName(), taskSpec.getVertexName()));
+        this.initializerCompletionService.submit(new StartInputCallable(inputsMap.get(inputSpec
+            .getSourceVertexName()), inputSpec.getSourceVertexName()));
+        LOG.info("Input: " + inputSpec.getSourceVertexName()
+            + " being auto started by the framework. Subsequent instances will not be auto-started");
       }
     }
 
-    for (OutputSpec outputSpec : outputSpecs) {
-      numAutoStarts++;
-      this.initializerCompletionService
-          .submit(new StartOutputCallable(outputsMap.get(outputSpec.getDestinationVertexName()),
-              outputContextMap.get(outputSpec.getDestinationVertexName())));
+    if (groupInputSpecs != null) {      
+      for (GroupInputSpec group : groupInputSpecs) {
+        if (!inputAlreadyStarted(taskSpec.getVertexName(), group.getGroupName())) {
+          numAutoStarts++;
+          this.initializerCompletionService.submit(new StartInputCallable(groupInputsMap.get(group
+              .getGroupName()), group.getGroupName()));
+          LOG.info("InputGroup: " + group.getGroupName()
+              + " being auto started by the framework. Subsequent instance will not be auto-started");
+        }
+      }
     }
 
     // Shutdown after all tasks complete.
@@ -362,46 +369,21 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   private class StartInputCallable implements Callable<Void> {
     private final LogicalInput input;
     private final String srcVertexName;
-    private final String taskVertexName;
     
-    public StartInputCallable(LogicalInput input, String srcVertexName, String taskVertexName) {
+    public StartInputCallable(LogicalInput input, String srcVertexName) {
       this.input = input;
       this.srcVertexName = srcVertexName;
-      this.taskVertexName = taskVertexName;
     }
     
     @Override
     public Void call() throws Exception {
       LOG.info("Starting Input with src edge: " + srcVertexName);
-      List<Event> events = input.start();
-      sendTaskGeneratedEvents(events, EventProducerConsumerType.INPUT, taskVertexName,
-          srcVertexName, taskSpec.getTaskAttemptID());
+      input.start();
       LOG.info("Started Input with src edge: " + srcVertexName);
       return null;
     }
   }
 
-  private class StartOutputCallable implements Callable<Void> {
-    private final LogicalOutput output;
-    private final TezOutputContext outputContext;
-    
-    public StartOutputCallable(LogicalOutput output, TezOutputContext outputContext) {
-      this.output = output;
-      this.outputContext = outputContext;
-    }
-    
-    @Override
-    public Void call() throws Exception {
-      LOG.info("Starting Output with dest edge: " + outputContext.getDestinationVertexName());
-      List<Event> events = output.start();
-      sendTaskGeneratedEvents(events, EventProducerConsumerType.OUTPUT,
-          outputContext.getTaskVertexName(), outputContext.getDestinationVertexName(),
-          taskSpec.getTaskAttemptID());
-      LOG.info("Started Output with dest edge: " + outputContext.getDestinationVertexName());
-      return null;
-    }
-  }
-
   private class InitializeOutputCallable implements Callable<Void> {
 
     private final OutputSpec outputSpec;
@@ -435,6 +417,14 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     }
   }
 
+  private boolean inputAlreadyStarted(String vertexName, String edgeVertexName) {
+    if (startedInputsMap.containsKey(vertexName)
+        && startedInputsMap.get(vertexName).contains(edgeVertexName)) {
+      return true;
+    }
+    return false;
+  }
+
   private void initializeGroupInputs() {
     if (groupInputSpecs != null && !groupInputSpecs.isEmpty()) {
      groupInputsMap = new ConcurrentHashMap<String, LogicalInput>(groupInputSpecs.size());

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
new file mode 100644
index 0000000..a1d2fa2
--- /dev/null
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalIOProcessor;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.api.TezInputContext;
+import org.apache.tez.runtime.api.TezOutputContext;
+import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.Writer;
+import org.apache.tez.runtime.api.impl.InputSpec;
+import org.apache.tez.runtime.api.impl.OutputSpec;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.api.impl.TezUmbilical;
+import org.junit.Test;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+
+public class TestLogicalIOProcessorRuntimeTask {
+
+  @Test
+  public void testAutoStart() throws Exception {
+    TezDAGID dagId = createTezDagId();
+    TezVertexID vertexId = createTezVertexId(dagId);
+    Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
+    Multimap<String, String> startedInputsMap = HashMultimap.create();
+    TezUmbilical umbilical = mock(TezUmbilical.class);
+    TezConfiguration tezConf = new TezConfiguration();
+
+    TezTaskAttemptID taId1 = createTaskAttemptID(vertexId, 1);
+    TaskSpec task1 = createTaskSpec(taId1, "vertex1");
+
+    TezTaskAttemptID taId2 = createTaskAttemptID(vertexId, 2);
+    TaskSpec task2 = createTaskSpec(taId2, "vertex1");
+
+    LogicalIOProcessorRuntimeTask lio1 = new LogicalIOProcessorRuntimeTask(task1, 0, tezConf,
+        umbilical, serviceConsumerMetadata, startedInputsMap);
+
+    lio1.initialize();
+    lio1.run();
+    lio1.close();
+
+    // Input should've been started, Output should not have been started
+    assertEquals(1, TestProcessor.runCount);
+    assertEquals(1, TestInput.startCount);
+    assertEquals(0, TestOutput.startCount);
+
+    LogicalIOProcessorRuntimeTask lio2 = new LogicalIOProcessorRuntimeTask(task2, 0, tezConf,
+        umbilical, serviceConsumerMetadata, startedInputsMap);
+
+    lio2.initialize();
+    lio2.run();
+    lio2.close();
+
+    // Input should not have been started again, Output should not have been started
+    assertEquals(2, TestProcessor.runCount);
+    assertEquals(1, TestInput.startCount);
+    assertEquals(0, TestOutput.startCount);
+
+  }
+
+  private TaskSpec createTaskSpec(TezTaskAttemptID taskAttemptID, String vertexName) {
+    ProcessorDescriptor processorDesc = createProcessorDescriptor();
+    TaskSpec taskSpec = new TaskSpec(taskAttemptID, vertexName, processorDesc,
+        createInputSpecList(), createOutputSpecList(), null);
+    return taskSpec;
+  }
+
+  private List<InputSpec> createInputSpecList() {
+    InputDescriptor inputDesc = new InputDescriptor(TestInput.class.getName());
+    InputSpec inputSpec = new InputSpec("inedge", inputDesc, 1);
+    return Lists.newArrayList(inputSpec);
+  }
+
+  private List<OutputSpec> createOutputSpecList() {
+    OutputDescriptor outputtDesc = new OutputDescriptor(TestOutput.class.getName());
+    OutputSpec outputSpec = new OutputSpec("outedge", outputtDesc, 1);
+    return Lists.newArrayList(outputSpec);
+  }
+
+  private ProcessorDescriptor createProcessorDescriptor() {
+    ProcessorDescriptor desc = new ProcessorDescriptor(TestProcessor.class.getName());
+    return desc;
+  }
+
+  private TezTaskAttemptID createTaskAttemptID(TezVertexID vertexId, int taskIndex) {
+    TezTaskID taskId = TezTaskID.getInstance(vertexId, taskIndex);
+    TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(taskId, taskIndex);
+    return taskAttemptId;
+  }
+
+  private TezVertexID createTezVertexId(TezDAGID dagId) {
+    return TezVertexID.getInstance(dagId, 1);
+  }
+
+  private TezDAGID createTezDagId() {
+    return TezDAGID.getInstance("2000", 100, 1);
+  }
+
+  public static class TestProcessor implements LogicalIOProcessor {
+
+    public static volatile int runCount = 0;
+
+    public TestProcessor() {
+    }
+
+    @Override
+    public void initialize(TezProcessorContext processorContext) throws Exception {
+    }
+
+    @Override
+    public void handleEvents(List<Event> processorEvents) {
+      // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void close() throws Exception {
+      // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs)
+        throws Exception {
+      runCount++;
+    }
+
+  }
+
+  public static class TestInput implements LogicalInput {
+
+    public static volatile int startCount = 0;
+
+    public TestInput() {
+    }
+
+    @Override
+    public List<Event> initialize(TezInputContext inputContext) throws Exception {
+      inputContext.requestInitialMemory(0, null);
+      return null;
+    }
+
+    @Override
+    public void start() throws Exception {
+      startCount++;
+      System.err.println("In started");
+    }
+
+    @Override
+    public Reader getReader() throws Exception {
+      return null;
+    }
+
+    @Override
+    public void handleEvents(List<Event> inputEvents) throws Exception {
+    }
+
+    @Override
+    public List<Event> close() throws Exception {
+      return null;
+    }
+
+    @Override
+    public void setNumPhysicalInputs(int numInputs) {
+    }
+
+  }
+
+  public static class TestOutput implements LogicalOutput {
+
+    public static volatile int startCount = 0;
+
+    public TestOutput() {
+    }
+
+    @Override
+    public List<Event> initialize(TezOutputContext outputContext) throws Exception {
+      outputContext.requestInitialMemory(0, null);
+      return null;
+    }
+
+    @Override
+    public void start() throws Exception {
+      System.err.println("Out started");
+      startCount++;
+    }
+
+    @Override
+    public Writer getWriter() throws Exception {
+      return null;
+    }
+
+    @Override
+    public void handleEvents(List<Event> outputEvents) {
+    }
+
+    @Override
+    public List<Event> close() throws Exception {
+      return null;
+    }
+
+    @Override
+    public void setNumPhysicalOutputs(int numOutputs) {
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
index 4d2441c..bbeae52 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
@@ -50,8 +50,7 @@ public class LocalMergedInput extends ShuffledMergedInputLegacy {
   }
   
   @Override
-  public List<Event> start() throws IOException {
-    return Collections.emptyList();
+  public void start() throws IOException {
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
index bee7e45..35c1ab5 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
@@ -20,6 +20,7 @@ package org.apache.tez.runtime.library.input;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -64,6 +65,8 @@ public class ShuffledMergedInput implements LogicalInput {
 
   private TezCounter inputKeyCounter;
   private TezCounter inputValueCounter;
+  
+  private final AtomicBoolean isStarted = new AtomicBoolean(false);
 
   @Override
   public List<Event> initialize(TezInputContext inputContext) throws IOException {
@@ -84,10 +87,11 @@ public class ShuffledMergedInput implements LogicalInput {
   }
 
   @Override
-  public List<Event> start() throws IOException {
-    // Start the shuffle - copy and merge
-    shuffle.run();
-    return Collections.emptyList();
+  public void start() throws IOException {
+    if (!isStarted.getAndSet(true)) {
+      // Start the shuffle - copy and merge
+      shuffle.run();
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
index 61dff6f..9123345 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
@@ -21,6 +21,7 @@ package org.apache.tez.runtime.library.input;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -45,6 +46,8 @@ public class ShuffledUnorderedKVInput implements LogicalInput {
   @SuppressWarnings("rawtypes")
   private BroadcastKVReader kvReader;
   
+  private final AtomicBoolean isStarted = new AtomicBoolean(false);
+  
   public ShuffledUnorderedKVInput() {
   }
 
@@ -64,10 +67,11 @@ public class ShuffledUnorderedKVInput implements LogicalInput {
   }
 
   @Override
-  public List<Event> start() throws IOException {
-    this.shuffleManager.run();
-    this.kvReader = this.shuffleManager.createReader();
-    return Collections.emptyList();
+  public void start() throws IOException {
+    if (!isStarted.getAndSet(true)) {
+      this.shuffleManager.run();
+      this.kvReader = this.shuffleManager.createReader();
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
index 38f6bf9..06bafa5 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
@@ -55,6 +56,7 @@ public class OnFileSortedOutput implements LogicalOutput {
   private long startTime;
   private long endTime;
   
+  private final AtomicBoolean isStarted = new AtomicBoolean(false);
   
     
   @Override
@@ -80,9 +82,10 @@ public class OnFileSortedOutput implements LogicalOutput {
   }
 
   @Override
-  public List<Event> start() throws Exception {
-    sorter.start();
-    return Collections.emptyList();
+  public void start() throws Exception {
+    if (!isStarted.getAndSet(true)) {
+      sorter.start();
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
index 04d638f..6cc6554 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
@@ -87,8 +87,7 @@ public class OnFileUnorderedKVOutput implements LogicalOutput {
   }
 
   @Override
-  public List<Event> start() {
-    return Collections.emptyList();
+  public void start() {
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
index 2e3aba0..fada3fd 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
@@ -62,6 +62,12 @@ public class SleepProcessor implements LogicalIOProcessor {
                   Map<String, LogicalOutput> outputs) throws Exception {
     LOG.info("Running the Sleep Processor, sleeping for "
       + timeToSleepMS + " ms");
+    for (LogicalInput input : inputs.values()) {
+      input.start();
+    }
+    for (LogicalOutput output : outputs.values()) {
+      output.start();
+    }
     try {
       Thread.sleep(timeToSleepMS);
     } catch (InterruptedException ie) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
index a49f5b2..8ac1585 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
@@ -246,8 +246,7 @@ public class TestInput implements LogicalInput {
   }
 
   @Override
-  public List<Event> start() {
-    return Collections.emptyList();
+  public void start() {
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java b/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
index ddd2a29..bef4218 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
@@ -52,8 +52,7 @@ public class TestOutput implements LogicalOutput {
   }
 
   @Override
-  public List<Event> start() {
-    return Collections.emptyList();
+  public void start() {
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java b/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
index c314042..2ebe3f1 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
@@ -134,6 +134,14 @@ public class TestProcessor implements LogicalIOProcessor {
   public void run(Map<String, LogicalInput> inputs,
       Map<String, LogicalOutput> outputs) throws Exception {
     LOG.info("Sleeping ms: " + sleepMs);
+
+    for (LogicalInput input : inputs.values()) {
+      input.start();
+    }
+    for (LogicalOutput output : outputs.values()) {
+      output.start();
+    }
+
     Thread.sleep(sleepMs);
     
     if (doFail) {