You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/02/18 04:10:19 UTC
git commit: TEZ-668. Allow Processors to control Input / Output
start. (sseth)
Repository: incubator-tez
Updated Branches:
refs/heads/master 41bbbb3b0 -> e5ee79198
TEZ-668. Allow Processors to control Input / Output start. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/e5ee7919
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/e5ee7919
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/e5ee7919
Branch: refs/heads/master
Commit: e5ee791982805972829bca58dd05c76d9bc792db
Parents: 41bbbb3
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Feb 17 19:09:51 2014 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Feb 17 19:09:51 2014 -0800
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../java/org/apache/tez/runtime/api/Input.java | 18 +-
.../tez/runtime/api/MergedLogicalInput.java | 14 +-
.../java/org/apache/tez/runtime/api/Output.java | 17 +-
.../apache/hadoop/mapred/YarnTezDagChild.java | 12 +-
.../mapreduce/examples/OrderedWordCount.java | 4 -
.../tez/mapreduce/examples/UnionExample.java | 12 +
.../tez/mapreduce/examples/WordCount.java | 14 ++
.../processor/FilterByWordInputProcessor.java | 9 +-
.../processor/FilterByWordOutputProcessor.java | 9 +-
.../org/apache/tez/mapreduce/input/MRInput.java | 4 +-
.../apache/tez/mapreduce/output/MROutput.java | 3 +-
.../mapreduce/processor/map/MapProcessor.java | 7 +-
.../processor/reduce/ReduceProcessor.java | 7 +
.../tez/mapreduce/processor/MapUtils.java | 6 +-
.../processor/reduce/TestReduceProcessor.java | 5 +-
.../runtime/LogicalIOProcessorRuntimeTask.java | 78 +++---
.../TestLogicalIOProcessorRuntimeTask.java | 245 +++++++++++++++++++
.../runtime/library/input/LocalMergedInput.java | 3 +-
.../library/input/ShuffledMergedInput.java | 12 +-
.../library/input/ShuffledUnorderedKVInput.java | 12 +-
.../library/output/OnFileSortedOutput.java | 9 +-
.../library/output/OnFileUnorderedKVOutput.java | 3 +-
.../library/processor/SleepProcessor.java | 6 +
.../java/org/apache/tez/test/TestInput.java | 3 +-
.../java/org/apache/tez/test/TestOutput.java | 3 +-
.../java/org/apache/tez/test/TestProcessor.java | 8 +
27 files changed, 423 insertions(+), 102 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 15a0711..86931be 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -14,6 +14,8 @@ INCOMPATIBLE CHANGES
TEZ-827. Separate initialize and start operations on Inputs/Outputs.
+ TEZ-668. Allow Processors to control Input/Output start
+
Release 0.2.0 - 2013-11-30
First version.
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-api/src/main/java/org/apache/tez/runtime/api/Input.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/Input.java b/tez-api/src/main/java/org/apache/tez/runtime/api/Input.java
index 4afefd1..27f66d0 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/Input.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/Input.java
@@ -43,20 +43,22 @@ public interface Input {
throws Exception;
/**
- * Start any processing that the Input may need to perform. This, for now, is
- * always invoked by the framework.
+ * Start any processing that the Input may need to perform. It is the
+ * responsibility of the Processor to start Inputs.
*
- * The implementation of Input is expected to be non blocking. Inputs should
- * see this as a signal to start processing, but must return control to the
- * framework before the Processor actually starts.
+ * This typically acts as a signal to Inputs to start any Processing that they
+ * may required. A blocking implementation of this method should not be used
+ * as a mechanism to determine when an Input is actually ready.
*
- * Inputs should be written to handle multiple start invocations - typically
+ * This method may be invoked by the framework under certain circumstances,
+ * and as such requires the implementation to be non-blocking.
+ *
+ * Inputs must be written to handle multiple start invocations - typically
* honoring only the first one.
*
- * @return list of events that were generated during start
* @throws Exception
*/
- public List<Event> start() throws Exception;
+ public void start() throws Exception;
/**
* Gets an instance of the {@link Reader} for this <code>Output</code>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
index 405e0d8..1e82aca 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
@@ -19,7 +19,7 @@
package org.apache.tez.runtime.api;
import java.util.List;
-import com.google.common.collect.Lists;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* A LogicalInput that is used to merge the data from multiple inputs and provide a
@@ -30,6 +30,7 @@ import com.google.common.collect.Lists;
public abstract class MergedLogicalInput implements LogicalInput {
private List<Input> inputs;
+ private final AtomicBoolean isStarted = new AtomicBoolean(false);
public final void initialize(List<Input> inputs) {
this.inputs = inputs;
@@ -45,15 +46,12 @@ public abstract class MergedLogicalInput implements LogicalInput {
}
@Override
- public List<Event> start() throws Exception {
- List<Event> events = Lists.newLinkedList();
- for (Input input : inputs) {
- List<Event> inputEvents = input.start();
- if (inputEvents != null) {
- events.addAll(inputEvents);
+ public final void start() throws Exception {
+ if (!isStarted.getAndSet(true)) {
+ for (Input input : inputs) {
+ input.start();
}
}
- return events;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-api/src/main/java/org/apache/tez/runtime/api/Output.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/Output.java b/tez-api/src/main/java/org/apache/tez/runtime/api/Output.java
index cfff7a1..e986c34 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/Output.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/Output.java
@@ -43,20 +43,21 @@ public interface Output {
throws Exception;
/**
- * Start any processing that the Output may need to perform. This, for now, is
- * always invoked by the framework.
+ * Start any processing that the Output may need to perform. It is the
+ * responsibility of the Processor to start Outputs.
*
- * The implementation of Output is expected to be non blocking. Outputs should
- * see this as a signal to start any processing that may be required, but must
- * return control to the framework before the Processor actually starts.
+ * This typically acts as a signal to Outputs to start any Processing that they
+ * may required.
*
- * Outputs should be written to handle multiple start invocations - typically
+ * This method may be invoked by the framework under certain circumstances,
+ * and as such requires the implementation to be non-blocking.
+ *
+ * Outputs must be written to handle multiple start invocations - typically
* honoring only the first one.
*
- * @return list of events that were generated during initialization
* @throws Exception
*/
- public List<Event> start() throws Exception;
+ public void start() throws Exception;
/**
* Gets an instance of the {@link Writer} in an <code>Output</code>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index 0e0d036..1288369 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -90,7 +90,9 @@ import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
import org.apache.tez.runtime.common.objectregistry.ObjectRegistryModule;
import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
+import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
import com.google.inject.Guice;
import com.google.inject.Injector;
@@ -118,6 +120,13 @@ public class YarnTezDagChild {
private static Throwable heartbeatErrorException = null;
// Implies that the task is done - and the AM is being informed.
private static AtomicBoolean currentTaskComplete = new AtomicBoolean(true);
+ /**
+ * Used to maintain information about which Inputs have been started by the
+ * framework for the specific DAG. Makes an assumption that multiple DAGs do
+ * not execute concurrently, and must be reset each time the running DAG
+ * changes.
+ */
+ private static Multimap<String, String> startedInputsMap = HashMultimap.create();
private static Thread startHeartbeatThread() {
Thread heartbeatThread = new Thread(new Runnable() {
@@ -483,6 +492,7 @@ public class YarnTezDagChild {
}
if (!lastVertexId.getDAGId().equals(newVertexId.getDAGId())) {
objectRegistry.clearCache(ObjectLifeCycle.DAG);
+ startedInputsMap = HashMultimap.create();
}
}
lastVertexId = newVertexId;
@@ -643,7 +653,7 @@ public class YarnTezDagChild {
LOG.info("LocalDirs for child: " + Arrays.toString(localDirs));
return new LogicalIOProcessorRuntimeTask(taskSpec, attemptNum, conf,
- tezUmbilical, serviceConsumerMetadata);
+ tezUmbilical, serviceConsumerMetadata, startedInputsMap);
}
// TODONEWTEZ Is this really required ?
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
index 3b5d573..c418bb2 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
@@ -18,7 +18,6 @@
package org.apache.tez.mapreduce.examples;
-import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumSet;
@@ -50,11 +49,8 @@ import org.apache.hadoop.mapreduce.split.TezGroupedSplitsInputFormat;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.util.Apps;
import org.apache.tez.client.AMConfiguration;
import org.apache.tez.client.PreWarmContext;
import org.apache.tez.client.TezClient;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
index 8a790fd..58443e8 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
@@ -116,6 +116,12 @@ public class UnionExample {
public void run(Map<String, LogicalInput> inputs,
Map<String, LogicalOutput> outputs) throws Exception {
Preconditions.checkArgument(inputs.size() == 1);
+ for (LogicalInput input : inputs.values()) {
+ input.start();
+ }
+ for (LogicalOutput output : outputs.values()) {
+ output.start();
+ }
boolean inUnion = true;
if (context.getTaskVertexName().equals("map3")) {
inUnion = false;
@@ -177,6 +183,12 @@ public class UnionExample {
Map<String, LogicalOutput> outputs) throws Exception {
Preconditions.checkArgument(inputs.size() == 2);
Preconditions.checkArgument(outputs.size() == 2);
+ for (LogicalInput input : inputs.values()) {
+ input.start();
+ }
+ for (LogicalOutput output : outputs.values()) {
+ output.start();
+ }
MROutput out = (MROutput) outputs.get("union");
MROutput allParts = (MROutput) outputs.get("all-parts");
KeyValueWriter kvWriter = out.getWriter();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
index 523514d..de4cc6c 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
@@ -108,6 +108,12 @@ public class WordCount {
@Override
public void run(Map<String, LogicalInput> inputs,
Map<String, LogicalOutput> outputs) throws Exception {
+ for (LogicalInput input : inputs.values()) {
+ input.start();
+ }
+ for (LogicalOutput output : outputs.values()) {
+ output.start();
+ }
Preconditions.checkArgument(inputs.size() == 1);
Preconditions.checkArgument(outputs.size() == 1);
MRInput input = (MRInput) inputs.values().iterator().next();
@@ -146,6 +152,14 @@ public class WordCount {
public void run(Map<String, LogicalInput> inputs,
Map<String, LogicalOutput> outputs) throws Exception {
Preconditions.checkArgument(inputs.size() == 1);
+
+ for (LogicalInput input : inputs.values()) {
+ input.start();
+ }
+ for (LogicalOutput output : outputs.values()) {
+ output.start();
+ }
+
MROutput out = (MROutput) outputs.values().iterator().next();
KeyValueWriter kvWriter = out.getWriter();
KeyValuesReader kvReader = (KeyValuesReader) inputs.values().iterator().next().getReader();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
index 04dccb0..1d78366 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
@@ -73,7 +73,7 @@ public class FilterByWordInputProcessor implements LogicalIOProcessor {
@Override
public void run(Map<String, LogicalInput> inputs,
Map<String, LogicalOutput> outputs) throws Exception {
-
+
if (inputs.size() != 1) {
throw new IllegalStateException("FilterByWordInputProcessor processor can only work with a single input");
}
@@ -81,6 +81,13 @@ public class FilterByWordInputProcessor implements LogicalIOProcessor {
if (outputs.size() != 1) {
throw new IllegalStateException("FilterByWordInputProcessor processor can only work with a single output");
}
+
+ for (LogicalInput input : inputs.values()) {
+ input.start();
+ }
+ for (LogicalOutput output : outputs.values()) {
+ output.start();
+ }
LogicalInput li = inputs.values().iterator().next();
if (! (li instanceof MRInput)) {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java
index 7355ab7..d061ea0 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java
@@ -61,7 +61,7 @@ public class FilterByWordOutputProcessor implements LogicalIOProcessor {
@Override
public void run(Map<String, LogicalInput> inputs,
Map<String, LogicalOutput> outputs) throws Exception {
-
+
if (inputs.size() != 1) {
throw new IllegalStateException("FilterByWordOutputProcessor processor can only work with a single input");
}
@@ -70,6 +70,13 @@ public class FilterByWordOutputProcessor implements LogicalIOProcessor {
throw new IllegalStateException("FilterByWordOutputProcessor processor can only work with a single output");
}
+ for (LogicalInput input : inputs.values()) {
+ input.start();
+ }
+ for (LogicalOutput output : outputs.values()) {
+ output.start();
+ }
+
LogicalInput li = inputs.values().iterator().next();
if (! (li instanceof ShuffledUnorderedKVInput)) {
throw new IllegalStateException("FilterByWordOutputProcessor processor can only work with ShuffledUnorderedKVInput");
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index 7108d37..edd8e92 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -18,7 +18,6 @@
package org.apache.tez.mapreduce.input;
import java.io.IOException;
-import java.util.Collections;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
@@ -159,8 +158,7 @@ public class MRInput implements LogicalInput {
}
@Override
- public List<Event> start() {
- return Collections.emptyList();
+ public void start() {
}
@Private
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index 2d7b48a..f9405f9 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -191,8 +191,7 @@ public class MROutput implements LogicalOutput {
}
@Override
- public List<Event> start() {
- return null;
+ public void start() {
}
public void initCommitter(JobConf job, boolean useNewApi)
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
index 00a054e..b90cd11 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.mapred.MapRunnable;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.split.TezGroupedSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
import org.apache.hadoop.util.ReflectionUtils;
@@ -91,6 +90,12 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor {
Map<String, LogicalOutput> outputs) throws Exception {
LOG.info("Running map: " + processorContext.getUniqueIdentifier());
+ for (LogicalInput input : inputs.values()) {
+ input.start();
+ }
+ for (LogicalOutput output : outputs.values()) {
+ output.start();
+ }
if (inputs.size() != 1
|| outputs.size() != 1) {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
index bf4f660..f98e34f 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
@@ -94,6 +94,13 @@ implements LogicalIOProcessor {
Map<String, LogicalOutput> outputs) throws Exception {
LOG.info("Running reduce: " + processorContext.getUniqueIdentifier());
+
+ for (LogicalInput input : inputs.values()) {
+ input.start();
+ }
+ for (LogicalOutput output : outputs.values()) {
+ output.start();
+ }
if (outputs.size() <= 0 || outputs.size() > 1) {
throw new IOException("Invalid number of outputs"
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
index e0a4c68..700acb0 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
@@ -62,6 +62,9 @@ import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TezUmbilical;
import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
+import com.google.common.collect.HashMultimap;
+
+
public class MapUtils {
private static final Log LOG = LogFactory.getLog(MapUtils.class);
@@ -216,7 +219,8 @@ public class MapUtils {
0,
jobConf,
umbilical,
- serviceConsumerMetadata);
+ serviceConsumerMetadata,
+ HashMultimap.<String, String>create());
return task;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index 7868ecc..5a2a83e 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -70,6 +70,8 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import com.google.common.collect.HashMultimap;
+
@SuppressWarnings("deprecation")
public class TestReduceProcessor {
@@ -184,7 +186,8 @@ public class TestReduceProcessor {
0,
reduceConf,
new TestUmbilical(),
- serviceConsumerMetadata);
+ serviceConsumerMetadata,
+ HashMultimap.<String, String>create());
task.initialize();
task.run();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index 21cc810..f5b414d 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -57,8 +57,9 @@ import org.apache.tez.runtime.api.Processor;
import org.apache.tez.runtime.api.TezInputContext;
import org.apache.tez.runtime.api.TezOutputContext;
import org.apache.tez.runtime.api.TezProcessorContext;
-import org.apache.tez.runtime.api.impl.GroupInputSpec;
import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
+import org.apache.tez.runtime.api.impl.GroupInputSpec;
import org.apache.tez.runtime.api.impl.InputSpec;
import org.apache.tez.runtime.api.impl.OutputSpec;
import org.apache.tez.runtime.api.impl.TaskSpec;
@@ -67,12 +68,12 @@ import org.apache.tez.runtime.api.impl.TezInputContextImpl;
import org.apache.tez.runtime.api.impl.TezOutputContextImpl;
import org.apache.tez.runtime.api.impl.TezProcessorContextImpl;
import org.apache.tez.runtime.api.impl.TezUmbilical;
-import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
import org.apache.tez.runtime.common.resources.MemoryDistributor;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -108,6 +109,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
private final ExecutorService initializerExecutor;
private final CompletionService<Void> initializerCompletionService;
+
+ private final Multimap<String, String> startedInputsMap;
private LinkedBlockingQueue<TezEvent> eventsToBeProcessed;
private Thread eventRouterThread = null;
@@ -116,7 +119,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec, int appAttemptNumber,
Configuration tezConf, TezUmbilical tezUmbilical,
- Map<String, ByteBuffer> serviceConsumerMetadata) throws IOException {
+ Map<String, ByteBuffer> serviceConsumerMetadata,
+ Multimap<String, String> startedInputsMap) throws IOException {
// TODO Remove jobToken from here post TEZ-421
super(taskSpec, tezConf, tezUmbilical);
LOG.info("Initializing LogicalIOProcessorRuntimeTask with TaskSpec: "
@@ -149,6 +153,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
this.initializerExecutor);
this.groupInputSpecs = taskSpec.getGroupInputs();
initialMemoryDistributor = new MemoryDistributor(numInputs, numOutputs, tezConf);
+ this.startedInputsMap = startedInputsMap;
}
/**
@@ -220,24 +225,26 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
+ " for start, since it will be controlled via it's Group");
continue;
}
- numAutoStarts++;
- this.initializerCompletionService.submit(new StartInputCallable(inputsMap.get(inputSpec
- .getSourceVertexName()), inputSpec.getSourceVertexName(), taskSpec.getVertexName()));
- }
-
- if (groupInputSpecs != null) {
- for (GroupInputSpec group : groupInputSpecs) {
+ if (!inputAlreadyStarted(taskSpec.getVertexName(), inputSpec.getSourceVertexName())) {
+ startedInputsMap.put(taskSpec.getVertexName(), inputSpec.getSourceVertexName());
numAutoStarts++;
- this.initializerCompletionService.submit(new StartInputCallable(groupInputsMap.get(group
- .getGroupName()), group.getGroupName(), taskSpec.getVertexName()));
+ this.initializerCompletionService.submit(new StartInputCallable(inputsMap.get(inputSpec
+ .getSourceVertexName()), inputSpec.getSourceVertexName()));
+ LOG.info("Input: " + inputSpec.getSourceVertexName()
+ + " being auto started by the framework. Subsequent instances will not be auto-started");
}
}
- for (OutputSpec outputSpec : outputSpecs) {
- numAutoStarts++;
- this.initializerCompletionService
- .submit(new StartOutputCallable(outputsMap.get(outputSpec.getDestinationVertexName()),
- outputContextMap.get(outputSpec.getDestinationVertexName())));
+ if (groupInputSpecs != null) {
+ for (GroupInputSpec group : groupInputSpecs) {
+ if (!inputAlreadyStarted(taskSpec.getVertexName(), group.getGroupName())) {
+ numAutoStarts++;
+ this.initializerCompletionService.submit(new StartInputCallable(groupInputsMap.get(group
+ .getGroupName()), group.getGroupName()));
+ LOG.info("InputGroup: " + group.getGroupName()
+ + " being auto started by the framework. Subsequent instance will not be auto-started");
+ }
+ }
}
// Shutdown after all tasks complete.
@@ -362,46 +369,21 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
private class StartInputCallable implements Callable<Void> {
private final LogicalInput input;
private final String srcVertexName;
- private final String taskVertexName;
- public StartInputCallable(LogicalInput input, String srcVertexName, String taskVertexName) {
+ public StartInputCallable(LogicalInput input, String srcVertexName) {
this.input = input;
this.srcVertexName = srcVertexName;
- this.taskVertexName = taskVertexName;
}
@Override
public Void call() throws Exception {
LOG.info("Starting Input with src edge: " + srcVertexName);
- List<Event> events = input.start();
- sendTaskGeneratedEvents(events, EventProducerConsumerType.INPUT, taskVertexName,
- srcVertexName, taskSpec.getTaskAttemptID());
+ input.start();
LOG.info("Started Input with src edge: " + srcVertexName);
return null;
}
}
- private class StartOutputCallable implements Callable<Void> {
- private final LogicalOutput output;
- private final TezOutputContext outputContext;
-
- public StartOutputCallable(LogicalOutput output, TezOutputContext outputContext) {
- this.output = output;
- this.outputContext = outputContext;
- }
-
- @Override
- public Void call() throws Exception {
- LOG.info("Starting Output with dest edge: " + outputContext.getDestinationVertexName());
- List<Event> events = output.start();
- sendTaskGeneratedEvents(events, EventProducerConsumerType.OUTPUT,
- outputContext.getTaskVertexName(), outputContext.getDestinationVertexName(),
- taskSpec.getTaskAttemptID());
- LOG.info("Started Output with dest edge: " + outputContext.getDestinationVertexName());
- return null;
- }
- }
-
private class InitializeOutputCallable implements Callable<Void> {
private final OutputSpec outputSpec;
@@ -435,6 +417,14 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
}
}
+ private boolean inputAlreadyStarted(String vertexName, String edgeVertexName) {
+ if (startedInputsMap.containsKey(vertexName)
+ && startedInputsMap.get(vertexName).contains(edgeVertexName)) {
+ return true;
+ }
+ return false;
+ }
+
private void initializeGroupInputs() {
if (groupInputSpecs != null && !groupInputSpecs.isEmpty()) {
groupInputsMap = new ConcurrentHashMap<String, LogicalInput>(groupInputSpecs.size());
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
new file mode 100644
index 0000000..a1d2fa2
--- /dev/null
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalIOProcessor;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.api.TezInputContext;
+import org.apache.tez.runtime.api.TezOutputContext;
+import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.Writer;
+import org.apache.tez.runtime.api.impl.InputSpec;
+import org.apache.tez.runtime.api.impl.OutputSpec;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.api.impl.TezUmbilical;
+import org.junit.Test;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+
+public class TestLogicalIOProcessorRuntimeTask {
+
+ @Test
+ public void testAutoStart() throws Exception {
+ TezDAGID dagId = createTezDagId();
+ TezVertexID vertexId = createTezVertexId(dagId);
+ Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
+ Multimap<String, String> startedInputsMap = HashMultimap.create();
+ TezUmbilical umbilical = mock(TezUmbilical.class);
+ TezConfiguration tezConf = new TezConfiguration();
+
+ TezTaskAttemptID taId1 = createTaskAttemptID(vertexId, 1);
+ TaskSpec task1 = createTaskSpec(taId1, "vertex1");
+
+ TezTaskAttemptID taId2 = createTaskAttemptID(vertexId, 2);
+ TaskSpec task2 = createTaskSpec(taId2, "vertex1");
+
+ LogicalIOProcessorRuntimeTask lio1 = new LogicalIOProcessorRuntimeTask(task1, 0, tezConf,
+ umbilical, serviceConsumerMetadata, startedInputsMap);
+
+ lio1.initialize();
+ lio1.run();
+ lio1.close();
+
+ // Input should've been started, Output should not have been started
+ assertEquals(1, TestProcessor.runCount);
+ assertEquals(1, TestInput.startCount);
+ assertEquals(0, TestOutput.startCount);
+
+ LogicalIOProcessorRuntimeTask lio2 = new LogicalIOProcessorRuntimeTask(task2, 0, tezConf,
+ umbilical, serviceConsumerMetadata, startedInputsMap);
+
+ lio2.initialize();
+ lio2.run();
+ lio2.close();
+
+ // Input should not have been started again, Output should not have been started
+ assertEquals(2, TestProcessor.runCount);
+ assertEquals(1, TestInput.startCount);
+ assertEquals(0, TestOutput.startCount);
+
+ }
+
+ private TaskSpec createTaskSpec(TezTaskAttemptID taskAttemptID, String vertexName) {
+ ProcessorDescriptor processorDesc = createProcessorDescriptor();
+ TaskSpec taskSpec = new TaskSpec(taskAttemptID, vertexName, processorDesc,
+ createInputSpecList(), createOutputSpecList(), null);
+ return taskSpec;
+ }
+
+ private List<InputSpec> createInputSpecList() {
+ InputDescriptor inputDesc = new InputDescriptor(TestInput.class.getName());
+ InputSpec inputSpec = new InputSpec("inedge", inputDesc, 1);
+ return Lists.newArrayList(inputSpec);
+ }
+
+ private List<OutputSpec> createOutputSpecList() {
+ OutputDescriptor outputtDesc = new OutputDescriptor(TestOutput.class.getName());
+ OutputSpec outputSpec = new OutputSpec("outedge", outputtDesc, 1);
+ return Lists.newArrayList(outputSpec);
+ }
+
+ private ProcessorDescriptor createProcessorDescriptor() {
+ ProcessorDescriptor desc = new ProcessorDescriptor(TestProcessor.class.getName());
+ return desc;
+ }
+
+ private TezTaskAttemptID createTaskAttemptID(TezVertexID vertexId, int taskIndex) {
+ TezTaskID taskId = TezTaskID.getInstance(vertexId, taskIndex);
+ TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(taskId, taskIndex);
+ return taskAttemptId;
+ }
+
+ private TezVertexID createTezVertexId(TezDAGID dagId) {
+ return TezVertexID.getInstance(dagId, 1);
+ }
+
+ private TezDAGID createTezDagId() {
+ return TezDAGID.getInstance("2000", 100, 1);
+ }
+
+ public static class TestProcessor implements LogicalIOProcessor {
+
+ public static volatile int runCount = 0;
+
+ public TestProcessor() {
+ }
+
+ @Override
+ public void initialize(TezProcessorContext processorContext) throws Exception {
+ }
+
+ @Override
+ public void handleEvents(List<Event> processorEvents) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void close() throws Exception {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs)
+ throws Exception {
+ runCount++;
+ }
+
+ }
+
+ public static class TestInput implements LogicalInput {
+
+ public static volatile int startCount = 0;
+
+ public TestInput() {
+ }
+
+ @Override
+ public List<Event> initialize(TezInputContext inputContext) throws Exception {
+ inputContext.requestInitialMemory(0, null);
+ return null;
+ }
+
+ @Override
+ public void start() throws Exception {
+ startCount++;
+ System.err.println("In started");
+ }
+
+ @Override
+ public Reader getReader() throws Exception {
+ return null;
+ }
+
+ @Override
+ public void handleEvents(List<Event> inputEvents) throws Exception {
+ }
+
+ @Override
+ public List<Event> close() throws Exception {
+ return null;
+ }
+
+ @Override
+ public void setNumPhysicalInputs(int numInputs) {
+ }
+
+ }
+
+ public static class TestOutput implements LogicalOutput {
+
+ public static volatile int startCount = 0;
+
+ public TestOutput() {
+ }
+
+ @Override
+ public List<Event> initialize(TezOutputContext outputContext) throws Exception {
+ outputContext.requestInitialMemory(0, null);
+ return null;
+ }
+
+ @Override
+ public void start() throws Exception {
+ System.err.println("Out started");
+ startCount++;
+ }
+
+ @Override
+ public Writer getWriter() throws Exception {
+ return null;
+ }
+
+ @Override
+ public void handleEvents(List<Event> outputEvents) {
+ }
+
+ @Override
+ public List<Event> close() throws Exception {
+ return null;
+ }
+
+ @Override
+ public void setNumPhysicalOutputs(int numOutputs) {
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
index 4d2441c..bbeae52 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
@@ -50,8 +50,7 @@ public class LocalMergedInput extends ShuffledMergedInputLegacy {
}
@Override
- public List<Event> start() throws IOException {
- return Collections.emptyList();
+ public void start() throws IOException {
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
index bee7e45..35c1ab5 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
@@ -20,6 +20,7 @@ package org.apache.tez.runtime.library.input;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -64,6 +65,8 @@ public class ShuffledMergedInput implements LogicalInput {
private TezCounter inputKeyCounter;
private TezCounter inputValueCounter;
+
+ private final AtomicBoolean isStarted = new AtomicBoolean(false);
@Override
public List<Event> initialize(TezInputContext inputContext) throws IOException {
@@ -84,10 +87,11 @@ public class ShuffledMergedInput implements LogicalInput {
}
@Override
- public List<Event> start() throws IOException {
- // Start the shuffle - copy and merge
- shuffle.run();
- return Collections.emptyList();
+ public void start() throws IOException {
+ if (!isStarted.getAndSet(true)) {
+ // Start the shuffle - copy and merge
+ shuffle.run();
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
index 61dff6f..9123345 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
@@ -21,6 +21,7 @@ package org.apache.tez.runtime.library.input;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -45,6 +46,8 @@ public class ShuffledUnorderedKVInput implements LogicalInput {
@SuppressWarnings("rawtypes")
private BroadcastKVReader kvReader;
+ private final AtomicBoolean isStarted = new AtomicBoolean(false);
+
public ShuffledUnorderedKVInput() {
}
@@ -64,10 +67,11 @@ public class ShuffledUnorderedKVInput implements LogicalInput {
}
@Override
- public List<Event> start() throws IOException {
- this.shuffleManager.run();
- this.kvReader = this.shuffleManager.createReader();
- return Collections.emptyList();
+ public void start() throws IOException {
+ if (!isStarted.getAndSet(true)) {
+ this.shuffleManager.run();
+ this.kvReader = this.shuffleManager.createReader();
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
index 38f6bf9..06bafa5 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.ApplicationConstants;
@@ -55,6 +56,7 @@ public class OnFileSortedOutput implements LogicalOutput {
private long startTime;
private long endTime;
+ private final AtomicBoolean isStarted = new AtomicBoolean(false);
@Override
@@ -80,9 +82,10 @@ public class OnFileSortedOutput implements LogicalOutput {
}
@Override
- public List<Event> start() throws Exception {
- sorter.start();
- return Collections.emptyList();
+ public void start() throws Exception {
+ if (!isStarted.getAndSet(true)) {
+ sorter.start();
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
index 04d638f..6cc6554 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
@@ -87,8 +87,7 @@ public class OnFileUnorderedKVOutput implements LogicalOutput {
}
@Override
- public List<Event> start() {
- return Collections.emptyList();
+ public void start() {
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
index 2e3aba0..fada3fd 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
@@ -62,6 +62,12 @@ public class SleepProcessor implements LogicalIOProcessor {
Map<String, LogicalOutput> outputs) throws Exception {
LOG.info("Running the Sleep Processor, sleeping for "
+ timeToSleepMS + " ms");
+ for (LogicalInput input : inputs.values()) {
+ input.start();
+ }
+ for (LogicalOutput output : outputs.values()) {
+ output.start();
+ }
try {
Thread.sleep(timeToSleepMS);
} catch (InterruptedException ie) {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
index a49f5b2..8ac1585 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
@@ -246,8 +246,7 @@ public class TestInput implements LogicalInput {
}
@Override
- public List<Event> start() {
- return Collections.emptyList();
+ public void start() {
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java b/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
index ddd2a29..bef4218 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
@@ -52,8 +52,7 @@ public class TestOutput implements LogicalOutput {
}
@Override
- public List<Event> start() {
- return Collections.emptyList();
+ public void start() {
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e5ee7919/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java b/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
index c314042..2ebe3f1 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
@@ -134,6 +134,14 @@ public class TestProcessor implements LogicalIOProcessor {
public void run(Map<String, LogicalInput> inputs,
Map<String, LogicalOutput> outputs) throws Exception {
LOG.info("Sleeping ms: " + sleepMs);
+
+ for (LogicalInput input : inputs.values()) {
+ input.start();
+ }
+ for (LogicalOutput output : outputs.values()) {
+ output.start();
+ }
+
Thread.sleep(sleepMs);
if (doFail) {