You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/02/12 22:57:34 UTC
git commit: TEZ-827. Separate initialize and start operations on
Inputs/Outputs. (sseth)
Updated Branches:
refs/heads/master 54e232211 -> 8d265ed46
TEZ-827. Separate initialize and start operations on Inputs/Outputs.
(sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/8d265ed4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/8d265ed4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/8d265ed4
Branch: refs/heads/master
Commit: 8d265ed4686f646f53ca388cf91d4966815f1eaf
Parents: 54e2322
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Feb 12 13:57:01 2014 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Feb 12 13:57:24 2014 -0800
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../java/org/apache/tez/runtime/api/Input.java | 16 +++
.../tez/runtime/api/MergedLogicalInput.java | 10 +-
.../java/org/apache/tez/runtime/api/Output.java | 16 +++
.../apache/tez/runtime/api/TezInputContext.java | 1 -
.../org/apache/tez/mapreduce/input/MRInput.java | 6 ++
.../apache/tez/mapreduce/output/MROutput.java | 6 ++
.../processor/reduce/TestReduceProcessor.java | 1 +
.../runtime/LogicalIOProcessorRuntimeTask.java | 107 ++++++++++++++++++-
.../org/apache/tez/runtime/RuntimeUtils.java | 3 +-
.../api/impl/TezProcessorContextImpl.java | 1 -
.../library/input/ShuffledMergedInput.java | 6 ++
.../library/input/ShuffledUnorderedKVInput.java | 6 ++
.../library/output/OnFileSortedOutput.java | 6 ++
.../library/output/OnFileUnorderedKVOutput.java | 6 ++
.../java/org/apache/tez/test/TestInput.java | 6 ++
.../java/org/apache/tez/test/TestOutput.java | 6 ++
17 files changed, 196 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d265ed4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 86f5394..15a0711 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -12,6 +12,8 @@ INCOMPATIBLE CHANGES
TEZ-650. MRHelpers.createMRInputPayloadWithGrouping() methods should not
take an MRSplitsProto argument.
+ TEZ-827. Separate initialize and start operations on Inputs/Outputs.
+
Release 0.2.0 - 2013-11-30
First version.
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d265ed4/tez-api/src/main/java/org/apache/tez/runtime/api/Input.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/Input.java b/tez-api/src/main/java/org/apache/tez/runtime/api/Input.java
index c064367..4afefd1 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/Input.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/Input.java
@@ -43,6 +43,22 @@ public interface Input {
throws Exception;
/**
+ * Start any processing that the Input may need to perform. This, for now, is
+ * always invoked by the framework.
+ *
+ * The implementation of Input is expected to be non blocking. Inputs should
+ * see this as a signal to start processing, but must return control to the
+ * framework before the Processor actually starts.
+ *
+ * Inputs should be written to handle multiple start invocations - typically
+ * honoring only the first one.
+ *
+ * @return list of events that were generated during start
+ * @throws Exception
+ */
+ public List<Event> start() throws Exception;
+
+ /**
* Gets an instance of the {@link Reader} for this <code>Output</code>
*
* @return Gets an instance of the {@link Reader} for this <code>Output</code>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d265ed4/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
index b879b4c..d91a863 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
@@ -30,7 +30,7 @@ public abstract class MergedLogicalInput implements LogicalInput {
private List<Input> inputs;
- public void initialize(List<Input> inputs) {
+ public final void initialize(List<Input> inputs) {
this.inputs = inputs;
}
@@ -44,6 +44,14 @@ public abstract class MergedLogicalInput implements LogicalInput {
}
@Override
+ public List<Event> start() throws Exception {
+ for (Input input : inputs) {
+ input.start();
+ }
+ return null;
+ }
+
+ @Override
public final void handleEvents(List<Event> inputEvents) throws Exception {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d265ed4/tez-api/src/main/java/org/apache/tez/runtime/api/Output.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/Output.java b/tez-api/src/main/java/org/apache/tez/runtime/api/Output.java
index f61e5b3..cfff7a1 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/Output.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/Output.java
@@ -43,6 +43,22 @@ public interface Output {
throws Exception;
/**
+ * Start any processing that the Output may need to perform. This, for now, is
+ * always invoked by the framework.
+ *
+ * The implementation of Output is expected to be non blocking. Outputs should
+ * see this as a signal to start any processing that may be required, but must
+ * return control to the framework before the Processor actually starts.
+ *
+ * Outputs should be written to handle multiple start invocations - typically
+ * honoring only the first one.
+ *
+ * @return list of events that were generated during initialization
+ * @throws Exception
+ */
+ public List<Event> start() throws Exception;
+
+ /**
* Gets an instance of the {@link Writer} in an <code>Output</code>
*
* @return Gets an instance of the {@link Writer} in an <code>Output</code>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d265ed4/tez-api/src/main/java/org/apache/tez/runtime/api/TezInputContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/TezInputContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/TezInputContext.java
index 79731b5..0e37030 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/TezInputContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/TezInputContext.java
@@ -35,5 +35,4 @@ public interface TezInputContext extends TezTaskContext {
* @return index
*/
public int getInputIndex();
-
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d265ed4/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index dda57fc..70003ef 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -157,6 +157,12 @@ public class MRInput implements LogicalInput {
initializeInternal();
return null;
}
+
+ @Override
+ public List<Event> start() {
+ // TODO TEZ-815 To be fixed in a subsequent jira if required.
+ return null;
+ }
@Private
void initializeInternal() throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d265ed4/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index e5d223d..efa18b4 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -190,6 +190,12 @@ public class MROutput implements LogicalOutput {
+ ", using_new_api: " + useNewApi);
return null;
}
+
+ @Override
+ public List<Event> start() {
+ // TODO TEZ-815 To be fixed in a subsequent jira if required.
+ return null;
+ }
public void initCommitter(JobConf job, boolean useNewApi)
throws IOException, InterruptedException {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d265ed4/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index 1981872..2f2ec16 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -17,6 +17,7 @@
*/
package org.apache.tez.mapreduce.processor.reduce;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d265ed4/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index 62de3ae..a8bb1d4 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -150,6 +150,9 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
initialMemoryDistributor = new MemoryDistributor(numInputs, numOutputs, tezConf);
}
+ /**
+ * @throws Exception
+ */
public void initialize() throws Exception {
LOG.info("Initializing LogicalProcessorIORuntimeTask");
Preconditions.checkState(this.state == State.NEW, "Already initialized");
@@ -170,8 +173,6 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
new InitializeOutputCallable(outputSpec, outputIndex++));
numTasks++;
}
- // Shutdown after all tasks complete.
- this.initializerExecutor.shutdown();
// Initialize processor in the current thread.
initializeLogicalIOProcessor();
@@ -192,11 +193,11 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
}
}
LOG.info("All initializers finished");
- initialMemoryDistributor.makeInitialAllocations();
-
// group inputs depend on inputs beings initialized. So must be done after.
initializeGroupInputs();
+ // Grouped input start will be controlled by the start of the GroupedInput
+ // Construct the set of groupedInputs up front so that start is not invoked on them.
Set<String> groupInputs = Sets.newHashSet();
// Construct Inputs/Outputs map argument for processor.run()
// first add the group inputs
@@ -207,6 +208,59 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
groupInputs.addAll(groupInputSpec.getGroupVertices());
}
}
+
+ initialMemoryDistributor.makeInitialAllocations();
+
+ LOG.info("Starting Inputs/Outputs");
+ int numAutoStarts = 0;
+ for (InputSpec inputSpec : inputSpecs) {
+ if (groupInputs.contains(inputSpec.getSourceVertexName())) {
+ LOG.info("Ignoring " + inputSpec.getSourceVertexName()
+ + " for start, since it will be controlled via it's Group");
+ continue;
+ }
+ numAutoStarts++;
+ this.initializerCompletionService.submit(new StartInputCallable(inputsMap.get(inputSpec
+ .getSourceVertexName()), inputSpec.getSourceVertexName(), taskSpec.getVertexName()));
+ }
+
+ if (groupInputSpecs != null) {
+ for (GroupInputSpec group : groupInputSpecs) {
+ numAutoStarts++;
+ this.initializerCompletionService.submit(new StartInputCallable(groupInputsMap.get(group
+ .getGroupName()), group.getGroupName(), taskSpec.getVertexName()));
+ }
+ }
+
+ for (OutputSpec outputSpec : outputSpecs) {
+ numAutoStarts++;
+ this.initializerCompletionService
+ .submit(new StartOutputCallable(outputsMap.get(outputSpec.getDestinationVertexName()),
+ outputContextMap.get(outputSpec.getDestinationVertexName())));
+ }
+
+ // Shutdown after all tasks complete.
+ this.initializerExecutor.shutdown();
+
+ completedTasks = 0;
+ LOG.info("Num IOs determined for AutoStart: " + numAutoStarts);
+ while (completedTasks < numAutoStarts) {
+ LOG.info("Waiting for " + (numAutoStarts - completedTasks) + " IOs to start");
+ Future<Void> future = initializerCompletionService.take();
+ try {
+ future.get();
+ completedTasks++;
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof Exception) {
+ throw (Exception) e.getCause();
+ } else {
+ throw new Exception(e);
+ }
+ }
+ }
+
+
+
// then add the non-grouped inputs
for (InputSpec inputSpec : inputSpecs) {
if (!groupInputs.contains(inputSpec.getSourceVertexName())) {
@@ -214,7 +268,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
runInputMap.put(inputSpec.getSourceVertexName(), input);
}
}
-
+
for (OutputSpec outputSpec : outputSpecs) {
LogicalOutput output = outputsMap.get(outputSpec.getDestinationVertexName());
String outputName = outputSpec.getDestinationVertexName();
@@ -304,6 +358,49 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
}
}
+ private class StartInputCallable implements Callable<Void> {
+ private final LogicalInput input;
+ private final String srcVertexName;
+ private final String taskVertexName;
+
+ public StartInputCallable(LogicalInput input, String srcVertexName, String taskVertexName) {
+ this.input = input;
+ this.srcVertexName = srcVertexName;
+ this.taskVertexName = taskVertexName;
+ }
+
+ @Override
+ public Void call() throws Exception {
+ LOG.info("Starting Input with src edge: " + srcVertexName);
+ List<Event> events = input.start();
+ sendTaskGeneratedEvents(events, EventProducerConsumerType.INPUT, taskVertexName,
+ srcVertexName, taskSpec.getTaskAttemptID());
+ LOG.info("Started Input with src edge: " + srcVertexName);
+ return null;
+ }
+ }
+
+ private class StartOutputCallable implements Callable<Void> {
+ private final LogicalOutput output;
+ private final TezOutputContext outputContext;
+
+ public StartOutputCallable(LogicalOutput output, TezOutputContext outputContext) {
+ this.output = output;
+ this.outputContext = outputContext;
+ }
+
+ @Override
+ public Void call() throws Exception {
+ LOG.info("Starting Output with dest edge: " + outputContext.getDestinationVertexName());
+ List<Event> events = output.start();
+ sendTaskGeneratedEvents(events, EventProducerConsumerType.OUTPUT,
+ outputContext.getTaskVertexName(), outputContext.getDestinationVertexName(),
+ taskSpec.getTaskAttemptID());
+ LOG.info("Started Output with dest edge: " + outputContext.getDestinationVertexName());
+ return null;
+ }
+ }
+
private class InitializeOutputCallable implements Callable<Void> {
private final OutputSpec outputSpec;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d265ed4/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeUtils.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeUtils.java
index f1f38d0..d414f31 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeUtils.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeUtils.java
@@ -34,7 +34,8 @@ public class RuntimeUtils {
private static final Map<String, Class<?>> CLAZZ_CACHE = new ConcurrentHashMap<String, Class<?>>();
- private static Class<?> getClazz(String className) {
+ @Private
+ public static Class<?> getClazz(String className) {
Class<?> clazz = CLAZZ_CACHE.get(className);
if (clazz == null) {
try {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d265ed4/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
index 7509cea..c25802a 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
@@ -84,5 +84,4 @@ public class TezProcessorContextImpl extends TezTaskContextImpl
public boolean canCommit() throws IOException {
return tezUmbilical.canCommit(this.taskAttemptID);
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d265ed4/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
index 6a01564..9b9cfaf 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
@@ -85,6 +85,12 @@ public class ShuffledMergedInput implements LogicalInput {
return Collections.emptyList();
}
+ @Override
+ public List<Event> start() {
+ // TODO TEZ-815 To be fixed in a subsequent jira if required.
+ return null;
+ }
+
/**
* Check if the input is ready for consumption
*
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d265ed4/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
index 147ba09..8535c58 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
@@ -66,6 +66,12 @@ public class ShuffledUnorderedKVInput implements LogicalInput {
}
@Override
+ public List<Event> start() {
+ // TODO TEZ-815 To be fixed in a subsequent jira if required.
+ return null;
+ }
+
+ @Override
public KeyValueReader getReader() throws Exception {
if (numInputs == 0) {
return new KeyValueReader() {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d265ed4/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
index 3b69e09..c9a12b2 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
@@ -80,6 +80,12 @@ public class OnFileSortedOutput implements LogicalOutput {
}
@Override
+ public List<Event> start() {
+ // TODO TEZ-815 To be fixed in a subsequent jira if required.
+ return null;
+ }
+
+ @Override
public KeyValueWriter getWriter() throws IOException {
return new KeyValueWriter() {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d265ed4/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
index 4007816..eb80940 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
@@ -88,6 +88,12 @@ public class OnFileUnorderedKVOutput implements LogicalOutput {
}
@Override
+ public List<Event> start() {
+ // TODO TEZ-815 To be fixed in a subsequent jira if required.
+ return null;
+ }
+
+ @Override
public KeyValueWriter getWriter() throws Exception {
return kvWriter;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d265ed4/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
index b24db78..3063ec2 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
@@ -224,6 +224,12 @@ public class TestInput implements LogicalInput {
}
@Override
+ public List<Event> start() {
+ // TODO TEZ-815 To be fixed in a subsequent jira if required.
+ return null;
+ }
+
+ @Override
public Reader getReader() throws Exception {
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d265ed4/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java b/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
index 1e71b08..5f2edae 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
@@ -51,6 +51,12 @@ public class TestOutput implements LogicalOutput {
}
@Override
+ public List<Event> start() {
+ // TODO TEZ-815 To be fixed in a subsequent jira if required.
+ return null;
+ }
+
+ @Override
public Writer getWriter() throws Exception {
return null;
}