You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2016/09/15 12:11:38 UTC
tez git commit: TEZ-3317. Speculative execution starts too early due
to 0 progress (Kuhu Shukla via rbalamohan)
Repository: tez
Updated Branches:
refs/heads/master a23de4982 -> b17edc401
TEZ-3317. Speculative execution starts too early due to 0 progress (Kuhu Shukla via rbalamohan)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b17edc40
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b17edc40
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b17edc40
Branch: refs/heads/master
Commit: b17edc401f7622d8aa3fde6d45767e6e18b34c72
Parents: a23de49
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Thu Sep 15 17:41:08 2016 +0530
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Thu Sep 15 17:41:08 2016 +0530
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../tez/runtime/api/AbstractLogicalInput.java | 5 +
.../tez/runtime/api/MergedLogicalInput.java | 5 +
.../org/apache/tez/mapreduce/input/MRInput.java | 5 +-
.../tez/mapreduce/input/MRInputLegacy.java | 4 +
.../mapreduce/processor/map/MapProcessor.java | 61 ++++++--
.../processor/reduce/ReduceProcessor.java | 59 ++++++--
.../processor/map/TestMapProcessor.java | 142 +++++++++++++++++++
.../api/impl/TezProcessorContextImpl.java | 6 +-
.../common/readers/UnorderedKVReader.java | 15 ++
.../common/shuffle/impl/ShuffleManager.java | 9 ++
.../input/ConcatenatedMergedKeyValueInput.java | 14 +-
.../input/ConcatenatedMergedKeyValuesInput.java | 15 +-
.../library/input/OrderedGroupedKVInput.java | 17 +++
.../input/OrderedGroupedMergedKVInput.java | 8 +-
.../runtime/library/input/UnorderedKVInput.java | 4 +
.../library/processor/SimpleProcessor.java | 40 +++++-
.../library/processor/SleepProcessor.java | 45 +++++-
.../processor/FilterByWordInputProcessor.java | 53 +++++--
.../processor/FilterByWordOutputProcessor.java | 1 +
20 files changed, 457 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/b17edc40/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 99bae83..ed0ef7b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3317. Speculative execution starts too early due to 0 progress.
TEZ-3404. Move blocking call for YARN Timeline domain creation from client side to AM.
TEZ-3272. Add AMContainerImpl and AMNodeImpl to StateMachine visualization list.
TEZ-3284. Synchronization for every write in UnorderdKVWriter
http://git-wip-us.apache.org/repos/asf/tez/blob/b17edc40/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java
index dea79b7..4c95eb9 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java
@@ -17,6 +17,7 @@
*/
package org.apache.tez.runtime.api;
+import java.io.IOException;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.Public;
@@ -80,4 +81,8 @@ public abstract class AbstractLogicalInput implements LogicalInput, LogicalInput
public final InputContext getContext() {
return inputContext;
}
+
+ public float getProgress() throws IOException, InterruptedException {
+ return 0.0f;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/b17edc40/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
index dedc902..3195a17 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
@@ -18,6 +18,7 @@
package org.apache.tez.runtime.api;
+import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -92,4 +93,8 @@ public abstract class MergedLogicalInput implements LogicalInput {
* Used by the framework to inform the MergedInput that one of it's constituent Inputs is ready.
*/
public abstract void setConstituentInputIsReady(Input input);
+
+ public float getProgress() throws IOException, InterruptedException {
+ return 0.0f;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/b17edc40/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index af4b05c..b83d1a3 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -611,8 +611,9 @@ public class MRInput extends MRInputBase {
}
}
- public float getProgress() throws IOException, InterruptedException {
- return mrReader.getProgress();
+ @Override
+ public float getProgress() throws IOException,InterruptedException {
+ return (mrReader != null) ? mrReader.getProgress() : 0.0f;
}
void processSplitEvent(InputDataInformationEvent event)
http://git-wip-us.apache.org/repos/asf/tez/blob/b17edc40/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
index e83c36a..9b5ed1c 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
@@ -87,6 +87,10 @@ public class MRInputLegacy extends MRInput {
return (org.apache.hadoop.mapreduce.RecordReader) mrReader.getRecordReader();
}
+ public float getProgress() throws IOException, InterruptedException {
+ return super.getProgress();
+ }
+
@Private
public InputSplit getOldInputSplit() {
return (InputSplit) mrReader.getSplit();
http://git-wip-us.apache.org/repos/asf/tez/blob/b17edc40/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
index 1a12a21..ed22d2b 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
@@ -21,6 +21,8 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Timer;
+import java.util.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,6 +46,7 @@ import org.apache.tez.mapreduce.input.MRInputLegacy;
import org.apache.tez.mapreduce.output.MROutputLegacy;
import org.apache.tez.mapreduce.processor.MRTask;
import org.apache.tez.mapreduce.processor.MRTaskReporter;
+import org.apache.tez.runtime.api.AbstractLogicalInput;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
@@ -58,6 +61,35 @@ public class MapProcessor extends MRTask{
private static final Logger LOG = LoggerFactory.getLogger(MapProcessor.class);
+ protected Map<String, LogicalInput> inputs;
+ protected Map<String, LogicalOutput> outputs;
+ Timer progressTimer = new Timer();
+ TimerTask progressTask = new TimerTask() {
+
+ @Override
+ public void run() {
+ try {
+ float progSum = 0.0f;
+ if (inputs != null && inputs.size() != 0) {
+ for(LogicalInput input : inputs.values()) {
+ if (input instanceof AbstractLogicalInput) {
+ progSum += ((AbstractLogicalInput) input).getProgress();
+ }
+ }
+ float progress = (1.0f) * progSum / inputs.size();
+ getContext().setProgress(progress);
+ mrReporter.setProgress(progress);
+ }
+ } catch (IOException e) {
+ LOG.warn("Encountered IOException during Processor progress update"
+ + e.getMessage());
+ } catch (InterruptedException e) {
+ LOG.warn("Encountered InterruptedException during Processor progress"
+ + "update" + e.getMessage());
+ }
+ }
+ };
+
public MapProcessor(ProcessorContext processorContext) {
super(processorContext, true);
}
@@ -69,33 +101,34 @@ public class MapProcessor extends MRTask{
}
public void close() throws IOException {
- // TODO Auto-generated method stub
+ progressTimer.cancel();
}
@Override
- public void run(Map<String, LogicalInput> inputs,
- Map<String, LogicalOutput> outputs) throws Exception {
-
+ public void run(Map<String, LogicalInput> _inputs,
+ Map<String, LogicalOutput> _outputs) throws Exception {
+ this.inputs = _inputs;
+ this.outputs = _outputs;
LOG.info("Running map: " + processorContext.getUniqueIdentifier());
- for (LogicalInput input : inputs.values()) {
+ for (LogicalInput input : _inputs.values()) {
input.start();
}
- for (LogicalOutput output : outputs.values()) {
+ for (LogicalOutput output : _outputs.values()) {
output.start();
}
- if (inputs.size() != 1
- || outputs.size() != 1) {
- throw new IOException("Cannot handle multiple inputs or outputs"
- + ", inputCount=" + inputs.size()
- + ", outputCount=" + outputs.size());
+ if (_inputs.size() != 1
+ || _outputs.size() != 1) {
+ throw new IOException("Cannot handle multiple _inputs or _outputs"
+ + ", inputCount=" + _inputs.size()
+ + ", outputCount=" + _outputs.size());
}
- LogicalInput in = inputs.values().iterator().next();
- LogicalOutput out = outputs.values().iterator().next();
+ LogicalInput in = _inputs.values().iterator().next();
+ LogicalOutput out = _outputs.values().iterator().next();
initTask(out);
-
+ progressTimer.schedule(progressTask, 0, 100);
// Sanity check
if (!(in instanceof MRInputLegacy)) {
throw new IOException(new TezException(
http://git-wip-us.apache.org/repos/asf/tez/blob/b17edc40/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
index 996cf84..8ec6091 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
@@ -22,6 +22,8 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,6 +44,7 @@ import org.apache.tez.dag.api.TezException;
import org.apache.tez.mapreduce.output.MROutputLegacy;
import org.apache.tez.mapreduce.processor.MRTask;
import org.apache.tez.mapreduce.processor.MRTaskReporter;
+import org.apache.tez.runtime.api.AbstractLogicalInput;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.Input;
import org.apache.tez.runtime.api.LogicalInput;
@@ -63,6 +66,35 @@ public class ReduceProcessor extends MRTask {
private Counter reduceInputKeyCounter;
private Counter reduceInputValueCounter;
+ protected Map<String, LogicalInput> inputs;
+ protected Map<String, LogicalOutput> outputs;
+ Timer progressTimer = new Timer();
+ TimerTask progressTask = new TimerTask() {
+
+ @Override
+ public void run() {
+ try {
+ float progSum = 0.0f;
+ if (inputs != null && inputs.size() != 0) {
+ for(LogicalInput input : inputs.values()) {
+ if (input instanceof AbstractLogicalInput) {
+ progSum += ((AbstractLogicalInput) input).getProgress();
+ }
+ }
+ float progress = (1.0f) * progSum / inputs.size();
+ getContext().setProgress(progress);
+ mrReporter.setProgress(progress);
+ }
+ } catch (IOException e) {
+ LOG.warn("Encountered IOException during Processor progress update"
+ + e.getMessage());
+ } catch (InterruptedException e) {
+ LOG.warn("Encountered InterruptedException during Processor progress"
+ + "update" + e.getMessage());
+ }
+ }
+ };
+
public ReduceProcessor(ProcessorContext processorContext) {
super(processorContext, false);
}
@@ -74,27 +106,28 @@ public class ReduceProcessor extends MRTask {
}
public void close() throws IOException {
- // TODO Auto-generated method stub
+ progressTimer.cancel();
}
@Override
- public void run(Map<String, LogicalInput> inputs,
- Map<String, LogicalOutput> outputs) throws Exception {
-
+ public void run(Map<String, LogicalInput> _inputs,
+ Map<String, LogicalOutput> _outputs) throws Exception {
+ this.inputs = _inputs;
+ this.outputs = _outputs;
LOG.info("Running reduce: " + processorContext.getUniqueIdentifier());
- if (outputs.size() <= 0 || outputs.size() > 1) {
- throw new IOException("Invalid number of outputs"
- + ", outputCount=" + outputs.size());
+ if (_outputs.size() <= 0 || _outputs.size() > 1) {
+ throw new IOException("Invalid number of _outputs"
+ + ", outputCount=" + _outputs.size());
}
- if (inputs.size() <= 0 || inputs.size() > 1) {
- throw new IOException("Invalid number of inputs"
- + ", inputCount=" + inputs.size());
+ if (_inputs.size() <= 0 || _inputs.size() > 1) {
+ throw new IOException("Invalid number of _inputs"
+ + ", inputCount=" + _inputs.size());
}
- LogicalInput in = inputs.values().iterator().next();
+ LogicalInput in = _inputs.values().iterator().next();
in.start();
List<Input> pendingInputs = new LinkedList<Input>();
@@ -102,11 +135,11 @@ public class ReduceProcessor extends MRTask {
processorContext.waitForAllInputsReady(pendingInputs);
LOG.info("Input is ready for consumption. Starting Output");
- LogicalOutput out = outputs.values().iterator().next();
+ LogicalOutput out = _outputs.values().iterator().next();
out.start();
initTask(out);
-
+ progressTimer.schedule(progressTask, 0, 100);
this.statusUpdate();
Class keyClass = ConfigUtils.getIntermediateInputKeyClass(jobConf);
http://git-wip-us.apache.org/repos/asf/tez/blob/b17edc40/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
index 70f8763..53b8c46 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
@@ -21,7 +21,22 @@ package org.apache.tez.mapreduce.processor.map;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
+import java.util.Random;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.split.JobSplit;
+import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez;
+import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -69,6 +84,9 @@ public class TestMapProcessor {
private static JobConf defaultConf = new JobConf();
private static FileSystem localFs = null;
private static Path workDir = null;
+ static float progressUpdate = 0.0f;
+ final private static FsPermission JOB_FILE_PERMISSION = FsPermission
+ .createImmutable((short) 0644);
static {
try {
defaultConf.set("fs.defaultFS", "file:///");
@@ -184,4 +202,128 @@ public class TestMapProcessor {
}
reader.close();
}
+
+ @Test(timeout = 10000)
+ public void testMapProcessorProgress() throws Exception {
+ String dagName = "mrdag0";
+ String vertexName = MultiStageMRConfigUtil.getInitialMapVertexName();
+ JobConf jobConf = new JobConf(defaultConf);
+ setUpJobConf(jobConf);
+
+ MRHelpers.translateMRConfToTez(jobConf);
+ jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
+
+ jobConf.setBoolean(MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS, false);
+
+ jobConf.set(MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR, new Path(workDir,
+ "localized-resources").toUri().toString());
+
+ Path mapInput = new Path(workDir, "map0");
+
+
+ generateInputSplit(localFs, workDir, jobConf, mapInput);
+
+ InputSpec mapInputSpec = new InputSpec("NullSrcVertex",
+ InputDescriptor.create(MRInputLegacy.class.getName())
+ .setUserPayload(UserPayload.create(ByteBuffer.wrap(
+ MRRuntimeProtos.MRInputUserPayloadProto.newBuilder()
+ .setConfigurationBytes(TezUtils.createByteStringFromConf
+ (jobConf)).build()
+ .toByteArray()))),
+ 1);
+ OutputSpec mapOutputSpec = new OutputSpec("NullDestVertex",
+ OutputDescriptor.create(OrderedPartitionedKVOutput.class.getName())
+ .setUserPayload(TezUtils.createUserPayloadFromConf(jobConf)), 1);
+
+ final LogicalIOProcessorRuntimeTask task = MapUtils.createLogicalTask
+ (localFs, workDir, jobConf, 0,
+ new Path(workDir, "map0"), new TestUmbilical(), dagName, vertexName,
+ Collections.singletonList(mapInputSpec),
+ Collections.singletonList(mapOutputSpec));
+
+ ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
+ Thread monitorProgress = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ float prog = task.getProgress();
+ if(prog > 0.0 && prog < 1.0)
+ progressUpdate = prog;
+ }
+ });
+
+ task.initialize();
+ scheduler.scheduleAtFixedRate(monitorProgress, 0, 10,
+ TimeUnit.MILLISECONDS);
+ task.run();
+ Assert.assertTrue("Progress Updates should be captured!",
+ progressUpdate != 0.0f);
+ task.close();
+ }
+
+ public static void generateInputSplit(FileSystem fs, Path workDir,
+ JobConf jobConf, Path mapInput)
+ throws IOException {
+ jobConf.setInputFormat(SequenceFileInputFormat.class);
+ FileInputFormat.setInputPaths(jobConf, workDir);
+
+ LOG.info("Generating data at path: " + mapInput);
+ // create a file with length entries
+ SequenceFile.Writer writer =
+ SequenceFile.createWriter(fs, jobConf, mapInput,
+ LongWritable.class, Text.class);
+ try {
+ Random r = new Random(System.currentTimeMillis());
+ LongWritable key = new LongWritable();
+ Text value = new Text();
+ for (int i = 100000; i > 0; i--) {
+ key.set(r.nextInt(1000));
+ value.set(Integer.toString(i));
+ writer.append(key, value);
+ LOG.info("<k, v> : <" + key.get() + ", " + value + ">");
+ }
+ } finally {
+ writer.close();
+ }
+
+ SequenceFileInputFormat<LongWritable, Text> format =
+ new SequenceFileInputFormat<LongWritable, Text>();
+ InputSplit[] splits = format.getSplits(jobConf, 1);
+ System.err.println("#split = " + splits.length + " ; " +
+ "#locs = " + splits[0].getLocations().length + "; " +
+ "loc = " + splits[0].getLocations()[0] + "; " +
+ "off = " + splits[0].getLength() + "; " +
+ "file = " + ((FileSplit)splits[0]).getPath());
+ writeSplitFiles(fs, jobConf, splits[0]);
+ }
+
+ private static void writeSplitFiles(FileSystem fs, JobConf conf,
+ InputSplit split) throws IOException {
+ Path jobSplitFile = new Path(conf.get(MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR,
+ MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR_DEFAULT), MRJobConfig.JOB_SPLIT);
+ LOG.info("Writing split to: " + jobSplitFile);
+ FSDataOutputStream out = FileSystem.create(fs, jobSplitFile,
+ new FsPermission(JOB_FILE_PERMISSION));
+
+ long offset = out.getPos();
+ Text.writeString(out, split.getClass().getName());
+ split.write(out);
+ out.close();
+
+ String[] locations = split.getLocations();
+
+ JobSplit.SplitMetaInfo info = null;
+ info = new JobSplit.SplitMetaInfo(locations, offset, split.getLength());
+
+ Path jobSplitMetaInfoFile = new Path(
+ conf.get(MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR),
+ MRJobConfig.JOB_SPLIT_METAINFO);
+
+ FSDataOutputStream outMeta = FileSystem.create(fs, jobSplitMetaInfoFile,
+ new FsPermission(JOB_FILE_PERMISSION));
+ outMeta.write(SplitMetaInfoReaderTez.META_SPLIT_FILE_HEADER);
+ WritableUtils.writeVInt(outMeta, SplitMetaInfoReaderTez.META_SPLIT_VERSION);
+ WritableUtils.writeVInt(outMeta, 1); // Only 1 split meta info being written
+ info.write(outMeta);
+ outMeta.close();
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/b17edc40/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
index d7c2d3e..c00b977 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
@@ -92,8 +92,10 @@ public class TezProcessorContextImpl extends TezTaskContextImpl implements Proce
@Override
public void setProgress(float progress) {
- runtimeTask.setProgress(progress);
- notifyProgress();
+ if (runtimeTask.getProgress() != progress) {
+ runtimeTask.setProgress(progress);
+ notifyProgress();
+ }
}
@SuppressWarnings("deprecation")
http://git-wip-us.apache.org/repos/asf/tez/blob/b17edc40/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
index 57bb121..3f44c4f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
@@ -73,6 +73,8 @@ public class UnorderedKVReader<K, V> extends KeyValueReader {
// TODO Remove this once per I/O counters are separated properly. Relying on
// the counter at the moment will generate aggregate numbers.
private int numRecordsRead = 0;
+ private long totalBytesRead = 0;
+ private long totalFileBytes = 0;
public UnorderedKVReader(ShuffleManager shuffleManager, Configuration conf,
@@ -146,6 +148,17 @@ public class UnorderedKVReader<K, V> extends KeyValueReader {
return value;
}
+ public float getProgress() {
+ int numInputs = shuffleManager.getNumInputs();
+ if (totalFileBytes > 0 && numInputs > 0) {
+ return ((1.0f) * (totalBytesRead + ((currentReader != null) ? currentReader.bytesRead :
+ 0l)) /
+ totalFileBytes) * (
+ shuffleManager.getNumCompletedInputs().floatValue() /
+ (1.0f * numInputs));
+ }
+ return 0l;
+ }
/**
* Tries reading the next key and value from the current reader.
* @return true if the current reader has more records
@@ -176,6 +189,7 @@ public class UnorderedKVReader<K, V> extends KeyValueReader {
*/
private boolean moveToNextInput() throws IOException {
if (currentReader != null) { // Close the current reader.
+ totalBytesRead += currentReader.bytesRead;
currentReader.close();
/**
* clear reader explicitly. Otherwise this could point to stale reference when next() is
@@ -196,6 +210,7 @@ public class UnorderedKVReader<K, V> extends KeyValueReader {
return false; // No more inputs
} else {
currentReader = openIFileReader(currentFetchedInput);
+ totalFileBytes += currentReader.getLength();
return true;
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/b17edc40/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index c80713b..a726924 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -875,6 +875,15 @@ public class ShuffleManager implements FetcherCallback {
} while (input instanceof NullFetchedInput);
return input;
}
+
+ public int getNumInputs() {
+ return numInputs;
+ }
+
+ public AtomicInteger getNumCompletedInputs() {
+ return numCompletedInputs;
+ }
+
/////////////////// End of methods for walking the available inputs
http://git-wip-us.apache.org/repos/asf/tez/blob/b17edc40/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
index 0b8ed21..743b628 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
@@ -37,6 +37,7 @@ import org.apache.tez.runtime.library.api.KeyValueReader;
*/
@Public
public class ConcatenatedMergedKeyValueInput extends MergedLogicalInput {
+ private ConcatenatedMergedKeyValueReader concatenatedMergedKeyValueReader;
public ConcatenatedMergedKeyValueInput(MergedInputContext context,
List<Input> inputs) {
@@ -87,7 +88,10 @@ public class ConcatenatedMergedKeyValueInput extends MergedLogicalInput {
public Object getCurrentValue() throws IOException {
return currentReader.getCurrentValue();
}
-
+
+ public float getProgress() {
+ return (1.0f)*(currentReaderIndex + 1)/getInputs().size();
+ }
}
/**
@@ -96,11 +100,17 @@ public class ConcatenatedMergedKeyValueInput extends MergedLogicalInput {
*/
@Override
public KeyValueReader getReader() throws Exception {
- return new ConcatenatedMergedKeyValueReader();
+ concatenatedMergedKeyValueReader = new ConcatenatedMergedKeyValueReader();
+ return concatenatedMergedKeyValueReader;
}
@Override
public void setConstituentInputIsReady(Input input) {
informInputReady();
}
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return concatenatedMergedKeyValueReader.getProgress();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/b17edc40/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
index 4a8969e..fa51f47 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
@@ -39,6 +39,8 @@ import org.apache.tez.runtime.library.api.KeyValuesReader;
@Public
public class ConcatenatedMergedKeyValuesInput extends MergedLogicalInput {
+ private ConcatenatedMergedKeyValuesReader concatenatedMergedKeyValuesReader;
+
public ConcatenatedMergedKeyValuesInput(MergedInputContext context,
List<Input> inputs) {
super(context, inputs);
@@ -88,7 +90,10 @@ public class ConcatenatedMergedKeyValuesInput extends MergedLogicalInput {
public Iterable<Object> getCurrentValues() throws IOException {
return currentReader.getCurrentValues();
}
-
+
+ public float getProgress() {
+ return (1.0f)*(currentReaderIndex + 1)/getInputs().size();
+ }
}
/**
@@ -97,11 +102,17 @@ public class ConcatenatedMergedKeyValuesInput extends MergedLogicalInput {
*/
@Override
public KeyValuesReader getReader() throws Exception {
- return new ConcatenatedMergedKeyValuesReader();
+ concatenatedMergedKeyValuesReader = new ConcatenatedMergedKeyValuesReader();
+ return concatenatedMergedKeyValuesReader;
}
@Override
public void setConstituentInputIsReady(Input input) {
informInputReady();
}
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return concatenatedMergedKeyValuesReader.getProgress();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/b17edc40/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
index 9a2a23e..c86e2fb 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
@@ -86,6 +86,7 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput {
private TezCounter inputKeyCounter;
private TezCounter inputValueCounter;
+ private TezCounter shuffledInputs;
private final AtomicBoolean isStarted = new AtomicBoolean(false);
@@ -114,6 +115,8 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput {
this.inputKeyCounter = getContext().getCounters().findCounter(TaskCounter.REDUCE_INPUT_GROUPS);
this.inputValueCounter = getContext().getCounters().findCounter(
TaskCounter.REDUCE_INPUT_RECORDS);
+ this.shuffledInputs = getContext().getCounters().findCounter(
+ TaskCounter.NUM_SHUFFLED_INPUTS);
this.conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, getContext().getWorkDirs());
return Collections.emptyList();
@@ -264,6 +267,20 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput {
}
@Override
+ public float getProgress() throws IOException, InterruptedException {
+ int totalInputs = getNumPhysicalInputs();
+ if (totalInputs != 0) {
+ synchronized (this) {
+ return ((0.5f) * this.shuffledInputs.getValue() / totalInputs) +
+ ((rawIter != null) ?
+ ((0.5f) * rawIter.getProgress().getProgress()) : 0.0f);
+ }
+ } else {
+ return 0.0f;
+ }
+ }
+
+ @Override
public void handleEvents(List<Event> inputEvents) throws IOException {
Shuffle shuffleLocalRef;
synchronized (this) {
http://git-wip-us.apache.org/repos/asf/tez/blob/b17edc40/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java
index 2345bbb..5d6668d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java
@@ -250,5 +250,11 @@ public class OrderedGroupedMergedKVInput extends MergedLogicalInput {
}
}
}
-
+ public float getProgress() throws IOException, InterruptedException {
+ float totalProgress = 0.0f;
+ for(Input input : getInputs()) {
+ totalProgress += ((OrderedGroupedKVInput)input).getProgress();
+ }
+ return (1.0f) * totalProgress/getInputs().size();
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/b17edc40/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
index ec9a191..f83b9aa 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
@@ -291,4 +291,8 @@ public class UnorderedKVInput extends AbstractLogicalInput {
return Collections.unmodifiableSet(confKeys);
}
+ @Override
+ public float getProgress() {
+ return kvReader.getProgress();
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/b17edc40/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java
index 725f785..66c3625 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java
@@ -17,17 +17,23 @@
*/
package org.apache.tez.runtime.library.processor;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.tez.runtime.api.AbstractLogicalInput;
import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.Processor;
import org.apache.tez.runtime.api.ProcessorContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Implements an {@link AbstractLogicalIOProcessor} and provides empty
@@ -38,9 +44,37 @@ import org.apache.tez.runtime.api.ProcessorContext;
@Public
@Evolving
public abstract class SimpleProcessor extends AbstractLogicalIOProcessor {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(AbstractLogicalIOProcessor.class);
protected Map<String, LogicalInput> inputs;
protected Map<String, LogicalOutput> outputs;
+ Timer progressTimer = new Timer();
+ TimerTask progressTask = new TimerTask() {
+
+ @Override
+ public void run() {
+ try {
+ float progSum = 0.0f;
+ if (getInputs() != null) {
+ for(LogicalInput input : getInputs().values()) {
+ if (input instanceof AbstractLogicalInput) {
+ progSum += ((AbstractLogicalInput) input).getProgress();
+ }
+ }
+ float progress = (1.0f) * progSum / inputs.size();
+ getContext().setProgress(progress);
+ }
+ } catch (IOException e) {
+ LOG.warn("Encountered IOException during Processor progress update"
+ + e.getMessage());
+ } catch (InterruptedException e) {
+ LOG.warn("Encountered InterruptedException during Processor progress"
+ + "update" + e.getMessage());
+ }
+ }
+ };
+
public SimpleProcessor(ProcessorContext context) {
super(context);
}
@@ -72,6 +106,7 @@ public abstract class SimpleProcessor extends AbstractLogicalIOProcessor {
for (LogicalInput input : getInputs().values()) {
input.start();
}
+ progressTimer.schedule(progressTask, 0, 100);
}
if (getOutputs() != null) {
for (LogicalOutput output : getOutputs().values()) {
@@ -101,7 +136,7 @@ public abstract class SimpleProcessor extends AbstractLogicalIOProcessor {
@Override
public void close() throws Exception {
-
+ progressTimer.cancel();
}
public Map<String, LogicalInput> getInputs() {
@@ -112,4 +147,7 @@ public abstract class SimpleProcessor extends AbstractLogicalIOProcessor {
return outputs;
}
+ public Timer getProgressTimer() {
+ return progressTimer;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/b17edc40/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
index 91dcb6d..92bbce8 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
@@ -18,17 +18,21 @@
package org.apache.tez.runtime.library.processor;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
import com.google.common.base.Charsets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.runtime.api.AbstractLogicalInput;
import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.LogicalInput;
@@ -47,6 +51,34 @@ public class SleepProcessor extends AbstractLogicalIOProcessor {
private static final Logger LOG = LoggerFactory.getLogger(SleepProcessor.class);
private int timeToSleepMS;
+ protected Map<String, LogicalInput> inputs;
+ protected Map<String, LogicalOutput> outputs;
+
+ Timer progressTimer = new Timer();
+ TimerTask progressTask = new TimerTask() {
+
+ @Override
+ public void run() {
+ try {
+ float progSum = 0.0f;
+ if (inputs != null) {
+ for(LogicalInput input : inputs.values()) {
+ if (input instanceof AbstractLogicalInput) {
+ progSum += ((AbstractLogicalInput) input).getProgress();
+ }
+ }
+ float progress = (1.0f) * progSum / inputs.size();
+ getContext().setProgress(progress);
+ }
+ } catch (IOException e) {
+ LOG.warn("Encountered IOException during Processor progress update" +
+ e.getMessage());
+ } catch (InterruptedException e) {
+ LOG.warn("Encountered InterruptedException during Processor progress" +
+ "update" + e.getMessage());
+ }
+ }
+ };
public SleepProcessor(ProcessorContext context) {
super(context);
@@ -69,14 +101,17 @@ public class SleepProcessor extends AbstractLogicalIOProcessor {
}
@Override
- public void run(Map<String, LogicalInput> inputs,
- Map<String, LogicalOutput> outputs) throws Exception {
+ public void run(Map<String, LogicalInput> _inputs,
+ Map<String, LogicalOutput> _outputs) throws Exception {
+ inputs = _inputs;
+ outputs = _outputs;
LOG.info("Running the Sleep Processor, sleeping for "
+ timeToSleepMS + " ms");
- for (LogicalInput input : inputs.values()) {
+ for (LogicalInput input : _inputs.values()) {
input.start();
}
- for (LogicalOutput output : outputs.values()) {
+ progressTimer.schedule(progressTask, 0, 100);
+ for (LogicalOutput output : _outputs.values()) {
output.start();
}
try {
@@ -93,7 +128,7 @@ public class SleepProcessor extends AbstractLogicalIOProcessor {
@Override
public void close() throws Exception {
- // Nothing to cleanup
+ progressTimer.cancel();
}
/**
http://git-wip-us.apache.org/repos/asf/tez/blob/b17edc40/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordInputProcessor.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordInputProcessor.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordInputProcessor.java
index 6103047..513c782 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordInputProcessor.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordInputProcessor.java
@@ -18,8 +18,11 @@
package org.apache.tez.mapreduce.examples.processor;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
import org.apache.tez.runtime.api.TaskFailureType;
import org.slf4j.Logger;
@@ -33,6 +36,7 @@ import org.apache.tez.mapreduce.examples.FilterLinesByWord.TextLongPair;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.apache.tez.runtime.api.AbstractLogicalInput;
import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.LogicalInput;
@@ -47,6 +51,33 @@ public class FilterByWordInputProcessor extends AbstractLogicalIOProcessor {
private static final Logger LOG = LoggerFactory.getLogger(FilterByWordInputProcessor.class);
private String filterWord;
+ protected Map<String, LogicalInput> inputs;
+ protected Map<String, LogicalOutput> outputs;
+ Timer progressTimer = new Timer();
+ TimerTask progressTask = new TimerTask() {
+
+ @Override
+ public void run() {
+ try {
+ float progSum = 0.0f;
+ if (inputs != null) {
+ for(LogicalInput input : inputs.values()) {
+ if (input instanceof AbstractLogicalInput) {
+ progSum += ((AbstractLogicalInput) input).getProgress();
+ }
+ }
+ float progress = (1.0f) * progSum / inputs.size();
+ getContext().setProgress(progress);
+ }
+ } catch (IOException e) {
+ LOG.warn("Encountered IOException during Processor progress update" +
+ e.getMessage());
+ } catch (InterruptedException e) {
+ LOG.warn("Encountered InterruptedException during Processor progress" +
+ "update" + e.getMessage());
+ }
+ }
+ };
public FilterByWordInputProcessor(ProcessorContext context) {
super(context);
@@ -70,38 +101,40 @@ public class FilterByWordInputProcessor extends AbstractLogicalIOProcessor {
@Override
public void close() throws Exception {
- LOG.info("Broadcast Processor closing. Nothing to do");
+ progressTimer.cancel();
}
@Override
- public void run(Map<String, LogicalInput> inputs,
- Map<String, LogicalOutput> outputs) throws Exception {
+ public void run(Map<String, LogicalInput> _inputs,
+ Map<String, LogicalOutput> _outputs) throws Exception {
+ this.inputs = _inputs;
+ this.outputs = _outputs;
- if (inputs.size() != 1) {
+ if (_inputs.size() != 1) {
throw new IllegalStateException("FilterByWordInputProcessor processor can only work with a single input");
}
- if (outputs.size() != 1) {
+ if (_outputs.size() != 1) {
throw new IllegalStateException("FilterByWordInputProcessor processor can only work with a single output");
}
- for (LogicalInput input : inputs.values()) {
+ for (LogicalInput input : _inputs.values()) {
input.start();
}
- for (LogicalOutput output : outputs.values()) {
+ for (LogicalOutput output : _outputs.values()) {
output.start();
}
- LogicalInput li = inputs.values().iterator().next();
+ LogicalInput li = _inputs.values().iterator().next();
if (! (li instanceof MRInput)) {
throw new IllegalStateException("FilterByWordInputProcessor processor can only work with MRInput");
}
- LogicalOutput lo = outputs.values().iterator().next();
+ LogicalOutput lo = _outputs.values().iterator().next();
if (! (lo instanceof UnorderedKVOutput)) {
throw new IllegalStateException("FilterByWordInputProcessor processor can only work with OnFileUnorderedKVOutput");
}
-
+ progressTimer.schedule(progressTask, 0, 100);
MRInputLegacy mrInput = (MRInputLegacy) li;
mrInput.init();
UnorderedKVOutput kvOutput = (UnorderedKVOutput) lo;
http://git-wip-us.apache.org/repos/asf/tez/blob/b17edc40/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordOutputProcessor.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordOutputProcessor.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordOutputProcessor.java
index 15c17fc..5872527 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordOutputProcessor.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordOutputProcessor.java
@@ -51,6 +51,7 @@ public class FilterByWordOutputProcessor extends SimpleMRProcessor {
@Override
public void close() throws Exception {
LOG.info("Broadcast Output Processor closing. Nothing to do");
+ getProgressTimer().cancel();
}
@Override