You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/02/12 22:57:34 UTC

git commit: TEZ-827. Separate initialize and start operations on Inputs/Outputs. (sseth)

Updated Branches:
  refs/heads/master 54e232211 -> 8d265ed46


TEZ-827. Separate initialize and start operations on Inputs/Outputs.
(sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/8d265ed4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/8d265ed4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/8d265ed4

Branch: refs/heads/master
Commit: 8d265ed4686f646f53ca388cf91d4966815f1eaf
Parents: 54e2322
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Feb 12 13:57:01 2014 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Feb 12 13:57:24 2014 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../java/org/apache/tez/runtime/api/Input.java  |  16 +++
 .../tez/runtime/api/MergedLogicalInput.java     |  10 +-
 .../java/org/apache/tez/runtime/api/Output.java |  16 +++
 .../apache/tez/runtime/api/TezInputContext.java |   1 -
 .../org/apache/tez/mapreduce/input/MRInput.java |   6 ++
 .../apache/tez/mapreduce/output/MROutput.java   |   6 ++
 .../processor/reduce/TestReduceProcessor.java   |   1 +
 .../runtime/LogicalIOProcessorRuntimeTask.java  | 107 ++++++++++++++++++-
 .../org/apache/tez/runtime/RuntimeUtils.java    |   3 +-
 .../api/impl/TezProcessorContextImpl.java       |   1 -
 .../library/input/ShuffledMergedInput.java      |   6 ++
 .../library/input/ShuffledUnorderedKVInput.java |   6 ++
 .../library/output/OnFileSortedOutput.java      |   6 ++
 .../library/output/OnFileUnorderedKVOutput.java |   6 ++
 .../java/org/apache/tez/test/TestInput.java     |   6 ++
 .../java/org/apache/tez/test/TestOutput.java    |   6 ++
 17 files changed, 196 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d265ed4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 86f5394..15a0711 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -12,6 +12,8 @@ INCOMPATIBLE CHANGES
   TEZ-650. MRHelpers.createMRInputPayloadWithGrouping() methods should not
   take an MRSplitsProto argument.
 
+  TEZ-827. Separate initialize and start operations on Inputs/Outputs.
+
 Release 0.2.0 - 2013-11-30
 
   First version.

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d265ed4/tez-api/src/main/java/org/apache/tez/runtime/api/Input.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/Input.java b/tez-api/src/main/java/org/apache/tez/runtime/api/Input.java
index c064367..4afefd1 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/Input.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/Input.java
@@ -43,6 +43,22 @@ public interface Input {
       throws Exception;
 
   /**
+   * Start any processing that the Input may need to perform. This, for now, is
+   * always invoked by the framework.
+   * 
+   * The implementation of Input is expected to be non blocking. Inputs should
+   * see this as a signal to start processing, but must return control to the
+   * framework before the Processor actually starts.
+   * 
+   * Inputs should be written to handle multiple start invocations - typically
+   * honoring only the first one.
+   * 
+   * @return list of events that were generated during start
+   * @throws Exception
+   */
+  public List<Event> start() throws Exception;
+  
+  /**
    * Gets an instance of the {@link Reader} for this <code>Output</code>
    *
    * @return Gets an instance of the {@link Reader} for this <code>Output</code>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d265ed4/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
index b879b4c..d91a863 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
@@ -30,7 +30,7 @@ public abstract class MergedLogicalInput implements LogicalInput {
 
   private List<Input> inputs;
   
-  public void initialize(List<Input> inputs) {
+  public final void initialize(List<Input> inputs) {
     this.inputs = inputs;
   }
   
@@ -44,6 +44,14 @@ public abstract class MergedLogicalInput implements LogicalInput {
   }
 
   @Override
+  public List<Event> start() throws Exception {
+    for (Input input : inputs) {
+      input.start();
+    }
+    return null;
+  }
+
+  @Override
   public final void handleEvents(List<Event> inputEvents) throws Exception {
     throw new UnsupportedOperationException();
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d265ed4/tez-api/src/main/java/org/apache/tez/runtime/api/Output.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/Output.java b/tez-api/src/main/java/org/apache/tez/runtime/api/Output.java
index f61e5b3..cfff7a1 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/Output.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/Output.java
@@ -43,6 +43,22 @@ public interface Output {
       throws Exception;
 
   /**
+   * Start any processing that the Output may need to perform. This, for now, is
+   * always invoked by the framework.
+   * 
+   * The implementation of Output is expected to be non blocking. Outputs should
+   * see this as a signal to start any processing that may be required, but must
+   * return control to the framework before the Processor actually starts.
+   * 
+   * Outputs should be written to handle multiple start invocations - typically
+   * honoring only the first one.
+   * 
+   * @return list of events that were generated during initialization
+   * @throws Exception
+   */
+  public List<Event> start() throws Exception;
+
+  /**
    * Gets an instance of the {@link Writer} in an <code>Output</code>
    *
    * @return Gets an instance of the {@link Writer} in an <code>Output</code>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d265ed4/tez-api/src/main/java/org/apache/tez/runtime/api/TezInputContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/TezInputContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/TezInputContext.java
index 79731b5..0e37030 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/TezInputContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/TezInputContext.java
@@ -35,5 +35,4 @@ public interface TezInputContext extends TezTaskContext {
    * @return index
    */
   public int getInputIndex();
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d265ed4/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index dda57fc..70003ef 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -157,6 +157,12 @@ public class MRInput implements LogicalInput {
     initializeInternal();
     return null;
   }
+  
+  @Override
+  public List<Event> start() {
+    // TODO TEZ-815 To be fixed in a subsequent jira if required.
+    return null;
+  }
 
   @Private
   void initializeInternal() throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d265ed4/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index e5d223d..efa18b4 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -190,6 +190,12 @@ public class MROutput implements LogicalOutput {
         + ", using_new_api: " + useNewApi);
     return null;
   }
+  
+  @Override
+  public List<Event> start() {
+    // TODO TEZ-815 To be fixed in a subsequent jira if required.
+    return null;
+  }
 
   public void initCommitter(JobConf job, boolean useNewApi)
       throws IOException, InterruptedException {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d265ed4/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index 1981872..2f2ec16 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -17,6 +17,7 @@
  */
 package org.apache.tez.mapreduce.processor.reduce;
 
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collections;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d265ed4/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index 62de3ae..a8bb1d4 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -150,6 +150,9 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     initialMemoryDistributor = new MemoryDistributor(numInputs, numOutputs, tezConf);
   }
 
+  /**
+   * @throws Exception
+   */
   public void initialize() throws Exception {
     LOG.info("Initializing LogicalProcessorIORuntimeTask");
     Preconditions.checkState(this.state == State.NEW, "Already initialized");
@@ -170,8 +173,6 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
           new InitializeOutputCallable(outputSpec, outputIndex++));
       numTasks++;
     }
-    // Shutdown after all tasks complete.
-    this.initializerExecutor.shutdown();
     
     // Initialize processor in the current thread.
     initializeLogicalIOProcessor();
@@ -192,11 +193,11 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
       }
     }
     LOG.info("All initializers finished");
-    initialMemoryDistributor.makeInitialAllocations();
-
     // group inputs depend on inputs beings initialized. So must be done after.
     initializeGroupInputs();
+    // Grouped input start will be controlled by the start of the GroupedInput
     
+    // Construct the set of groupedInputs up front so that start is not invoked on them.
     Set<String> groupInputs = Sets.newHashSet();
     // Construct Inputs/Outputs map argument for processor.run()
     // first add the group inputs
@@ -207,6 +208,59 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
         groupInputs.addAll(groupInputSpec.getGroupVertices());
       }
     }
+
+    initialMemoryDistributor.makeInitialAllocations();
+
+    LOG.info("Starting Inputs/Outputs");
+    int numAutoStarts = 0;
+    for (InputSpec inputSpec : inputSpecs) {
+      if (groupInputs.contains(inputSpec.getSourceVertexName())) {
+        LOG.info("Ignoring " + inputSpec.getSourceVertexName()
+            + " for start, since it will be controlled via it's Group");
+        continue;
+      }
+      numAutoStarts++;
+      this.initializerCompletionService.submit(new StartInputCallable(inputsMap.get(inputSpec
+          .getSourceVertexName()), inputSpec.getSourceVertexName(), taskSpec.getVertexName()));
+    }
+
+    if (groupInputSpecs != null) {
+      for (GroupInputSpec group : groupInputSpecs) {
+        numAutoStarts++;
+        this.initializerCompletionService.submit(new StartInputCallable(groupInputsMap.get(group
+            .getGroupName()), group.getGroupName(), taskSpec.getVertexName()));
+      }
+    }
+
+    for (OutputSpec outputSpec : outputSpecs) {
+      numAutoStarts++;
+      this.initializerCompletionService
+          .submit(new StartOutputCallable(outputsMap.get(outputSpec.getDestinationVertexName()),
+              outputContextMap.get(outputSpec.getDestinationVertexName())));
+    }
+
+    // Shutdown after all tasks complete.
+    this.initializerExecutor.shutdown();
+    
+    completedTasks = 0;
+    LOG.info("Num IOs determined for AutoStart: " + numAutoStarts);
+    while (completedTasks < numAutoStarts) {
+      LOG.info("Waiting for " + (numAutoStarts - completedTasks) + " IOs to start");
+      Future<Void> future = initializerCompletionService.take();
+      try {
+        future.get();
+        completedTasks++;
+      } catch (ExecutionException e) {
+        if (e.getCause() instanceof Exception) {
+          throw (Exception) e.getCause();
+        } else {
+          throw new Exception(e);
+        }
+      }
+    }
+
+    
+
     // then add the non-grouped inputs
     for (InputSpec inputSpec : inputSpecs) {
       if (!groupInputs.contains(inputSpec.getSourceVertexName())) {
@@ -214,7 +268,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
         runInputMap.put(inputSpec.getSourceVertexName(), input);
       }
     }
-    
+
     for (OutputSpec outputSpec : outputSpecs) {
       LogicalOutput output = outputsMap.get(outputSpec.getDestinationVertexName());
       String outputName = outputSpec.getDestinationVertexName();
@@ -304,6 +358,49 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     }
   }
 
+  private class StartInputCallable implements Callable<Void> {
+    private final LogicalInput input;
+    private final String srcVertexName;
+    private final String taskVertexName;
+    
+    public StartInputCallable(LogicalInput input, String srcVertexName, String taskVertexName) {
+      this.input = input;
+      this.srcVertexName = srcVertexName;
+      this.taskVertexName = taskVertexName;
+    }
+    
+    @Override
+    public Void call() throws Exception {
+      LOG.info("Starting Input with src edge: " + srcVertexName);
+      List<Event> events = input.start();
+      sendTaskGeneratedEvents(events, EventProducerConsumerType.INPUT, taskVertexName,
+          srcVertexName, taskSpec.getTaskAttemptID());
+      LOG.info("Started Input with src edge: " + srcVertexName);
+      return null;
+    }
+  }
+
+  private class StartOutputCallable implements Callable<Void> {
+    private final LogicalOutput output;
+    private final TezOutputContext outputContext;
+    
+    public StartOutputCallable(LogicalOutput output, TezOutputContext outputContext) {
+      this.output = output;
+      this.outputContext = outputContext;
+    }
+    
+    @Override
+    public Void call() throws Exception {
+      LOG.info("Starting Output with dest edge: " + outputContext.getDestinationVertexName());
+      List<Event> events = output.start();
+      sendTaskGeneratedEvents(events, EventProducerConsumerType.OUTPUT,
+          outputContext.getTaskVertexName(), outputContext.getDestinationVertexName(),
+          taskSpec.getTaskAttemptID());
+      LOG.info("Started Output with dest edge: " + outputContext.getDestinationVertexName());
+      return null;
+    }
+  }
+
   private class InitializeOutputCallable implements Callable<Void> {
 
     private final OutputSpec outputSpec;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d265ed4/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeUtils.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeUtils.java
index f1f38d0..d414f31 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeUtils.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeUtils.java
@@ -34,7 +34,8 @@ public class RuntimeUtils {
   
   private static final Map<String, Class<?>> CLAZZ_CACHE = new ConcurrentHashMap<String, Class<?>>();
 
-  private static Class<?> getClazz(String className) {
+  @Private
+  public static Class<?> getClazz(String className) {
     Class<?> clazz = CLAZZ_CACHE.get(className);
     if (clazz == null) {
       try {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d265ed4/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
index 7509cea..c25802a 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
@@ -84,5 +84,4 @@ public class TezProcessorContextImpl extends TezTaskContextImpl
   public boolean canCommit() throws IOException {
     return tezUmbilical.canCommit(this.taskAttemptID);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d265ed4/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
index 6a01564..9b9cfaf 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
@@ -85,6 +85,12 @@ public class ShuffledMergedInput implements LogicalInput {
     return Collections.emptyList();
   }
 
+  @Override
+  public List<Event> start() {
+    // TODO TEZ-815 To be fixed in a subsequent jira if required.
+    return null;
+  }
+
   /**
    * Check if the input is ready for consumption
    *

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d265ed4/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
index 147ba09..8535c58 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
@@ -66,6 +66,12 @@ public class ShuffledUnorderedKVInput implements LogicalInput {
   }
 
   @Override
+  public List<Event> start() {
+    // TODO TEZ-815 To be fixed in a subsequent jira if required.
+    return null;
+  }
+
+  @Override
   public KeyValueReader getReader() throws Exception {
     if (numInputs == 0) {
       return new KeyValueReader() {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d265ed4/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
index 3b69e09..c9a12b2 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
@@ -80,6 +80,12 @@ public class OnFileSortedOutput implements LogicalOutput {
   }
 
   @Override
+  public List<Event> start() {
+    // TODO TEZ-815 To be fixed in a subsequent jira if required.
+    return null;
+  }
+
+  @Override
   public KeyValueWriter getWriter() throws IOException {
     return new KeyValueWriter() {
       @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d265ed4/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
index 4007816..eb80940 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
@@ -88,6 +88,12 @@ public class OnFileUnorderedKVOutput implements LogicalOutput {
   }
 
   @Override
+  public List<Event> start() {
+    // TODO TEZ-815 To be fixed in a subsequent jira if required.
+    return null;
+  }
+
+  @Override
   public KeyValueWriter getWriter() throws Exception {
     return kvWriter;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d265ed4/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
index b24db78..3063ec2 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
@@ -224,6 +224,12 @@ public class TestInput implements LogicalInput {
   }
 
   @Override
+  public List<Event> start() {
+    // TODO TEZ-815 To be fixed in a subsequent jira if required.
+    return null;
+  }
+
+  @Override
   public Reader getReader() throws Exception {
     return null;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d265ed4/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java b/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
index 1e71b08..5f2edae 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
@@ -51,6 +51,12 @@ public class TestOutput implements LogicalOutput {
   }
 
   @Override
+  public List<Event> start() {
+    // TODO TEZ-815 To be fixed in a subsequent jira if required.
+    return null;
+  }
+
+  @Override
   public Writer getWriter() throws Exception {
     return null;
   }