You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2016/09/15 12:11:38 UTC

tez git commit: TEZ-3317. Speculative execution starts too early due to 0 progress (Kuhu Shukla via rbalamohan)

Repository: tez
Updated Branches:
  refs/heads/master a23de4982 -> b17edc401


TEZ-3317. Speculative execution starts too early due to 0 progress (Kuhu Shukla via rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b17edc40
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b17edc40
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b17edc40

Branch: refs/heads/master
Commit: b17edc401f7622d8aa3fde6d45767e6e18b34c72
Parents: a23de49
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Thu Sep 15 17:41:08 2016 +0530
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Thu Sep 15 17:41:08 2016 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../tez/runtime/api/AbstractLogicalInput.java   |   5 +
 .../tez/runtime/api/MergedLogicalInput.java     |   5 +
 .../org/apache/tez/mapreduce/input/MRInput.java |   5 +-
 .../tez/mapreduce/input/MRInputLegacy.java      |   4 +
 .../mapreduce/processor/map/MapProcessor.java   |  61 ++++++--
 .../processor/reduce/ReduceProcessor.java       |  59 ++++++--
 .../processor/map/TestMapProcessor.java         | 142 +++++++++++++++++++
 .../api/impl/TezProcessorContextImpl.java       |   6 +-
 .../common/readers/UnorderedKVReader.java       |  15 ++
 .../common/shuffle/impl/ShuffleManager.java     |   9 ++
 .../input/ConcatenatedMergedKeyValueInput.java  |  14 +-
 .../input/ConcatenatedMergedKeyValuesInput.java |  15 +-
 .../library/input/OrderedGroupedKVInput.java    |  17 +++
 .../input/OrderedGroupedMergedKVInput.java      |   8 +-
 .../runtime/library/input/UnorderedKVInput.java |   4 +
 .../library/processor/SimpleProcessor.java      |  40 +++++-
 .../library/processor/SleepProcessor.java       |  45 +++++-
 .../processor/FilterByWordInputProcessor.java   |  53 +++++--
 .../processor/FilterByWordOutputProcessor.java  |   1 +
 20 files changed, 457 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/b17edc40/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 99bae83..ed0ef7b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3317. Speculative execution starts too early due to 0 progress.
   TEZ-3404. Move blocking call for YARN Timeline domain creation from client side to AM.
   TEZ-3272. Add AMContainerImpl and AMNodeImpl to StateMachine visualization list.
   TEZ-3284. Synchronization for every write in UnorderdKVWriter

http://git-wip-us.apache.org/repos/asf/tez/blob/b17edc40/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java
index dea79b7..4c95eb9 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java
@@ -17,6 +17,7 @@
  */
 package org.apache.tez.runtime.api;
 
+import java.io.IOException;
 import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience.Public;
@@ -80,4 +81,8 @@ public abstract class AbstractLogicalInput implements LogicalInput, LogicalInput
   public final InputContext getContext() {
     return inputContext;
   }
+
+  public float getProgress() throws IOException, InterruptedException {
+    return 0.0f;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/b17edc40/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
index dedc902..3195a17 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.runtime.api;
 
+import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -92,4 +93,8 @@ public abstract class MergedLogicalInput implements LogicalInput {
    * Used by the framework to inform the MergedInput that one of it's constituent Inputs is ready.
    */
   public abstract void setConstituentInputIsReady(Input input);
+
+  public float getProgress() throws IOException, InterruptedException {
+    return 0.0f;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/b17edc40/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index af4b05c..b83d1a3 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -611,8 +611,9 @@ public class MRInput extends MRInputBase {
     }
   }
 
-  public float getProgress() throws IOException, InterruptedException {
-    return mrReader.getProgress();
+  @Override
+  public float getProgress() throws IOException,InterruptedException {
+    return (mrReader != null) ? mrReader.getProgress() : 0.0f;
   }
 
   void processSplitEvent(InputDataInformationEvent event)

http://git-wip-us.apache.org/repos/asf/tez/blob/b17edc40/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
index e83c36a..9b5ed1c 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
@@ -87,6 +87,10 @@ public class MRInputLegacy extends MRInput {
     return (org.apache.hadoop.mapreduce.RecordReader) mrReader.getRecordReader();
   }
 
+  public float getProgress() throws IOException, InterruptedException {
+      return super.getProgress();
+  }
+
   @Private
   public InputSplit getOldInputSplit() {
     return (InputSplit) mrReader.getSplit();

http://git-wip-us.apache.org/repos/asf/tez/blob/b17edc40/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
index 1a12a21..ed22d2b 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
@@ -21,6 +21,8 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Timer;
+import java.util.TimerTask;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,6 +46,7 @@ import org.apache.tez.mapreduce.input.MRInputLegacy;
 import org.apache.tez.mapreduce.output.MROutputLegacy;
 import org.apache.tez.mapreduce.processor.MRTask;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
+import org.apache.tez.runtime.api.AbstractLogicalInput;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
@@ -58,6 +61,35 @@ public class MapProcessor extends MRTask{
 
   private static final Logger LOG = LoggerFactory.getLogger(MapProcessor.class);
 
+  protected Map<String, LogicalInput> inputs;
+  protected Map<String, LogicalOutput> outputs;
+  Timer progressTimer = new Timer();
+  TimerTask progressTask = new TimerTask() {
+
+    @Override
+    public void run() {
+      try {
+        float progSum = 0.0f;
+        if (inputs != null && inputs.size() != 0) {
+          for(LogicalInput input : inputs.values()) {
+            if (input instanceof AbstractLogicalInput) {
+              progSum += ((AbstractLogicalInput) input).getProgress();
+            }
+          }
+          float progress = (1.0f) * progSum / inputs.size();
+          getContext().setProgress(progress);
+          mrReporter.setProgress(progress);
+        }
+      } catch (IOException e) {
+        LOG.warn("Encountered IOException during Processor progress update"
+            + e.getMessage());
+      } catch (InterruptedException e) {
+        LOG.warn("Encountered InterruptedException during Processor progress"
+            + "update" + e.getMessage());
+      }
+    }
+  };
+
   public MapProcessor(ProcessorContext processorContext) {
     super(processorContext, true);
   }
@@ -69,33 +101,34 @@ public class MapProcessor extends MRTask{
   }
 
   public void close() throws IOException {
-    // TODO Auto-generated method stub
+    progressTimer.cancel();
 
   }
 
   @Override
-  public void run(Map<String, LogicalInput> inputs,
-      Map<String, LogicalOutput> outputs) throws Exception {
-
+  public void run(Map<String, LogicalInput> _inputs,
+      Map<String, LogicalOutput> _outputs) throws Exception {
+    this.inputs = _inputs;
+    this.outputs = _outputs;
     LOG.info("Running map: " + processorContext.getUniqueIdentifier());
-    for (LogicalInput input : inputs.values()) {
+    for (LogicalInput input : _inputs.values()) {
       input.start();
     }
-    for (LogicalOutput output : outputs.values()) {
+    for (LogicalOutput output : _outputs.values()) {
       output.start();
     }
 
-    if (inputs.size() != 1
-        || outputs.size() != 1) {
-      throw new IOException("Cannot handle multiple inputs or outputs"
-          + ", inputCount=" + inputs.size()
-          + ", outputCount=" + outputs.size());
+    if (_inputs.size() != 1
+        || _outputs.size() != 1) {
+      throw new IOException("Cannot handle multiple _inputs or _outputs"
+          + ", inputCount=" + _inputs.size()
+          + ", outputCount=" + _outputs.size());
     }
-    LogicalInput in = inputs.values().iterator().next();
-    LogicalOutput out = outputs.values().iterator().next();
+    LogicalInput in = _inputs.values().iterator().next();
+    LogicalOutput out = _outputs.values().iterator().next();
 
     initTask(out);
-
+    progressTimer.schedule(progressTask, 0, 100);
     // Sanity check
     if (!(in instanceof MRInputLegacy)) {
       throw new IOException(new TezException(

http://git-wip-us.apache.org/repos/asf/tez/blob/b17edc40/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
index 996cf84..8ec6091 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
@@ -22,6 +22,8 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,6 +44,7 @@ import org.apache.tez.dag.api.TezException;
 import org.apache.tez.mapreduce.output.MROutputLegacy;
 import org.apache.tez.mapreduce.processor.MRTask;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
+import org.apache.tez.runtime.api.AbstractLogicalInput;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.LogicalInput;
@@ -63,6 +66,35 @@ public class ReduceProcessor extends MRTask {
   private Counter reduceInputKeyCounter;
   private Counter reduceInputValueCounter;
 
+  protected Map<String, LogicalInput> inputs;
+  protected Map<String, LogicalOutput> outputs;
+  Timer progressTimer = new Timer();
+  TimerTask progressTask = new TimerTask() {
+
+    @Override
+    public void run() {
+      try {
+        float progSum = 0.0f;
+        if (inputs != null && inputs.size() != 0) {
+          for(LogicalInput input : inputs.values()) {
+            if (input instanceof AbstractLogicalInput) {
+              progSum += ((AbstractLogicalInput) input).getProgress();
+            }
+          }
+          float progress = (1.0f) * progSum / inputs.size();
+          getContext().setProgress(progress);
+          mrReporter.setProgress(progress);
+        }
+      } catch (IOException e) {
+        LOG.warn("Encountered IOException during Processor progress update"
+            + e.getMessage());
+      } catch (InterruptedException e) {
+        LOG.warn("Encountered InterruptedException during Processor progress"
+            + "update" + e.getMessage());
+      }
+    }
+  };
+
   public ReduceProcessor(ProcessorContext processorContext) {
     super(processorContext, false);
   }
@@ -74,27 +106,28 @@ public class ReduceProcessor extends MRTask {
   }
 
   public void close() throws IOException {
-    // TODO Auto-generated method stub
+    progressTimer.cancel();
 
   }
 
   @Override
-  public void run(Map<String, LogicalInput> inputs,
-      Map<String, LogicalOutput> outputs) throws Exception {
-
+  public void run(Map<String, LogicalInput> _inputs,
+      Map<String, LogicalOutput> _outputs) throws Exception {
+    this.inputs = _inputs;
+    this.outputs = _outputs;
     LOG.info("Running reduce: " + processorContext.getUniqueIdentifier());
 
-    if (outputs.size() <= 0 || outputs.size() > 1) {
-      throw new IOException("Invalid number of outputs"
-          + ", outputCount=" + outputs.size());
+    if (_outputs.size() <= 0 || _outputs.size() > 1) {
+      throw new IOException("Invalid number of _outputs"
+          + ", outputCount=" + _outputs.size());
     }
 
-    if (inputs.size() <= 0 || inputs.size() > 1) {
-      throw new IOException("Invalid number of inputs"
-          + ", inputCount=" + inputs.size());
+    if (_inputs.size() <= 0 || _inputs.size() > 1) {
+      throw new IOException("Invalid number of _inputs"
+          + ", inputCount=" + _inputs.size());
     }
 
-    LogicalInput in = inputs.values().iterator().next();
+    LogicalInput in = _inputs.values().iterator().next();
     in.start();
 
     List<Input> pendingInputs = new LinkedList<Input>();
@@ -102,11 +135,11 @@ public class ReduceProcessor extends MRTask {
     processorContext.waitForAllInputsReady(pendingInputs);
     LOG.info("Input is ready for consumption. Starting Output");
 
-    LogicalOutput out = outputs.values().iterator().next();
+    LogicalOutput out = _outputs.values().iterator().next();
     out.start();
 
     initTask(out);
-
+    progressTimer.schedule(progressTask, 0, 100);
     this.statusUpdate();
 
     Class keyClass = ConfigUtils.getIntermediateInputKeyClass(jobConf);

http://git-wip-us.apache.org/repos/asf/tez/blob/b17edc40/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
index 70f8763..53b8c46 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
@@ -21,7 +21,22 @@ package org.apache.tez.mapreduce.processor.map;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collections;
+import java.util.Random;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.split.JobSplit;
+import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez;
+import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -69,6 +84,9 @@ public class TestMapProcessor {
   private static JobConf defaultConf = new JobConf();
   private static FileSystem localFs = null; 
   private static Path workDir = null;
+  static float progressUpdate = 0.0f;
+  final private static FsPermission JOB_FILE_PERMISSION = FsPermission
+      .createImmutable((short) 0644);
   static {
     try {
       defaultConf.set("fs.defaultFS", "file:///");
@@ -184,4 +202,128 @@ public class TestMapProcessor {
     }
     reader.close();
   }
+
+  @Test(timeout = 10000)
+  public void testMapProcessorProgress() throws Exception {
+    String dagName = "mrdag0";
+    String vertexName = MultiStageMRConfigUtil.getInitialMapVertexName();
+    JobConf jobConf = new JobConf(defaultConf);
+    setUpJobConf(jobConf);
+
+    MRHelpers.translateMRConfToTez(jobConf);
+    jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
+
+    jobConf.setBoolean(MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS, false);
+
+    jobConf.set(MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR, new Path(workDir,
+        "localized-resources").toUri().toString());
+
+    Path mapInput = new Path(workDir, "map0");
+
+
+    generateInputSplit(localFs, workDir, jobConf, mapInput);
+
+    InputSpec mapInputSpec = new InputSpec("NullSrcVertex",
+        InputDescriptor.create(MRInputLegacy.class.getName())
+            .setUserPayload(UserPayload.create(ByteBuffer.wrap(
+                MRRuntimeProtos.MRInputUserPayloadProto.newBuilder()
+                    .setConfigurationBytes(TezUtils.createByteStringFromConf
+                        (jobConf)).build()
+                    .toByteArray()))),
+        1);
+    OutputSpec mapOutputSpec = new OutputSpec("NullDestVertex",
+        OutputDescriptor.create(OrderedPartitionedKVOutput.class.getName())
+            .setUserPayload(TezUtils.createUserPayloadFromConf(jobConf)), 1);
+
+    final LogicalIOProcessorRuntimeTask task = MapUtils.createLogicalTask
+        (localFs, workDir, jobConf, 0,
+            new Path(workDir, "map0"), new TestUmbilical(), dagName, vertexName,
+            Collections.singletonList(mapInputSpec),
+            Collections.singletonList(mapOutputSpec));
+
+    ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
+    Thread monitorProgress = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        float prog = task.getProgress();
+        if(prog > 0.0 && prog < 1.0)
+          progressUpdate = prog;
+      }
+    });
+
+    task.initialize();
+    scheduler.scheduleAtFixedRate(monitorProgress, 0, 10,
+        TimeUnit.MILLISECONDS);
+    task.run();
+    Assert.assertTrue("Progress Updates should be captured!",
+        progressUpdate != 0.0f);
+    task.close();
+  }
+
+  public static void generateInputSplit(FileSystem fs, Path workDir,
+                                        JobConf jobConf, Path mapInput)
+      throws IOException {
+    jobConf.setInputFormat(SequenceFileInputFormat.class);
+    FileInputFormat.setInputPaths(jobConf, workDir);
+
+    LOG.info("Generating data at path: " + mapInput);
+    // create a file with length entries
+    SequenceFile.Writer writer =
+        SequenceFile.createWriter(fs, jobConf, mapInput,
+            LongWritable.class, Text.class);
+    try {
+      Random r = new Random(System.currentTimeMillis());
+      LongWritable key = new LongWritable();
+      Text value = new Text();
+      for (int i = 100000; i > 0; i--) {
+        key.set(r.nextInt(1000));
+        value.set(Integer.toString(i));
+        writer.append(key, value);
+        LOG.info("<k, v> : <" + key.get() + ", " + value + ">");
+      }
+    } finally {
+      writer.close();
+    }
+
+    SequenceFileInputFormat<LongWritable, Text> format =
+        new SequenceFileInputFormat<LongWritable, Text>();
+    InputSplit[] splits = format.getSplits(jobConf, 1);
+    System.err.println("#split = " + splits.length + " ; " +
+        "#locs = " + splits[0].getLocations().length + "; " +
+        "loc = " + splits[0].getLocations()[0] + "; " +
+        "off = " + splits[0].getLength() + "; " +
+        "file = " + ((FileSplit)splits[0]).getPath());
+    writeSplitFiles(fs, jobConf, splits[0]);
+  }
+
+  private static void writeSplitFiles(FileSystem fs, JobConf conf,
+                                      InputSplit split) throws IOException {
+    Path jobSplitFile = new Path(conf.get(MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR,
+        MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR_DEFAULT), MRJobConfig.JOB_SPLIT);
+    LOG.info("Writing split to: " + jobSplitFile);
+    FSDataOutputStream out = FileSystem.create(fs, jobSplitFile,
+        new FsPermission(JOB_FILE_PERMISSION));
+
+    long offset = out.getPos();
+    Text.writeString(out, split.getClass().getName());
+    split.write(out);
+    out.close();
+
+    String[] locations = split.getLocations();
+
+    JobSplit.SplitMetaInfo info = null;
+    info = new JobSplit.SplitMetaInfo(locations, offset, split.getLength());
+
+    Path jobSplitMetaInfoFile = new Path(
+        conf.get(MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR),
+        MRJobConfig.JOB_SPLIT_METAINFO);
+
+    FSDataOutputStream outMeta = FileSystem.create(fs, jobSplitMetaInfoFile,
+        new FsPermission(JOB_FILE_PERMISSION));
+    outMeta.write(SplitMetaInfoReaderTez.META_SPLIT_FILE_HEADER);
+    WritableUtils.writeVInt(outMeta, SplitMetaInfoReaderTez.META_SPLIT_VERSION);
+    WritableUtils.writeVInt(outMeta, 1); // Only 1 split meta info being written
+    info.write(outMeta);
+    outMeta.close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/b17edc40/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
index d7c2d3e..c00b977 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
@@ -92,8 +92,10 @@ public class TezProcessorContextImpl extends TezTaskContextImpl implements Proce
 
   @Override
   public void setProgress(float progress) {
-    runtimeTask.setProgress(progress);
-    notifyProgress();
+    if (runtimeTask.getProgress() != progress) {
+      runtimeTask.setProgress(progress);
+      notifyProgress();
+    }
   }
 
   @SuppressWarnings("deprecation")

http://git-wip-us.apache.org/repos/asf/tez/blob/b17edc40/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
index 57bb121..3f44c4f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
@@ -73,6 +73,8 @@ public class UnorderedKVReader<K, V> extends KeyValueReader {
   // TODO Remove this once per I/O counters are separated properly. Relying on
   // the counter at the moment will generate aggregate numbers. 
   private int numRecordsRead = 0;
+  private long totalBytesRead = 0;
+  private long totalFileBytes = 0;
 
 
   public UnorderedKVReader(ShuffleManager shuffleManager, Configuration conf,
@@ -146,6 +148,17 @@ public class UnorderedKVReader<K, V> extends KeyValueReader {
     return value;
   }
 
+  public float getProgress() {
+    int numInputs = shuffleManager.getNumInputs();
+    if (totalFileBytes > 0 && numInputs > 0) {
+      return ((1.0f) * (totalBytesRead + ((currentReader != null) ? currentReader.bytesRead :
+      0l)) /
+          totalFileBytes) * (
+          shuffleManager.getNumCompletedInputs().floatValue() /
+              (1.0f * numInputs));
+    }
+    return 0l;
+  }
   /**
    * Tries reading the next key and value from the current reader.
    * @return true if the current reader has more records
@@ -176,6 +189,7 @@ public class UnorderedKVReader<K, V> extends KeyValueReader {
    */
   private boolean moveToNextInput() throws IOException {
     if (currentReader != null) { // Close the current reader.
+      totalBytesRead += currentReader.bytesRead;
       currentReader.close();
       /**
        * clear reader explicitly. Otherwise this could point to stale reference when next() is
@@ -196,6 +210,7 @@ public class UnorderedKVReader<K, V> extends KeyValueReader {
       return false; // No more inputs
     } else {
       currentReader = openIFileReader(currentFetchedInput);
+      totalFileBytes += currentReader.getLength();
       return true;
     }
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/b17edc40/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index c80713b..a726924 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -875,6 +875,15 @@ public class ShuffleManager implements FetcherCallback {
     } while (input instanceof NullFetchedInput);
     return input;
   }
+
+  public int getNumInputs() {
+    return numInputs;
+  }
+
+  public AtomicInteger getNumCompletedInputs() {
+    return numCompletedInputs;
+  }
+
   /////////////////// End of methods for walking the available inputs
 
 

http://git-wip-us.apache.org/repos/asf/tez/blob/b17edc40/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
index 0b8ed21..743b628 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
@@ -37,6 +37,7 @@ import org.apache.tez.runtime.library.api.KeyValueReader;
  */
 @Public
 public class ConcatenatedMergedKeyValueInput extends MergedLogicalInput {
+  private ConcatenatedMergedKeyValueReader concatenatedMergedKeyValueReader;
 
   public ConcatenatedMergedKeyValueInput(MergedInputContext context,
                                          List<Input> inputs) {
@@ -87,7 +88,10 @@ public class ConcatenatedMergedKeyValueInput extends MergedLogicalInput {
     public Object getCurrentValue() throws IOException {
       return currentReader.getCurrentValue();
     }
-    
+
+    public float getProgress() {
+      return (1.0f)*(currentReaderIndex + 1)/getInputs().size();
+    }
   }
 
   /**
@@ -96,11 +100,17 @@ public class ConcatenatedMergedKeyValueInput extends MergedLogicalInput {
    */
   @Override
   public KeyValueReader getReader() throws Exception {
-    return new ConcatenatedMergedKeyValueReader();
+    concatenatedMergedKeyValueReader = new ConcatenatedMergedKeyValueReader();
+    return concatenatedMergedKeyValueReader;
   }
 
   @Override
   public void setConstituentInputIsReady(Input input) {
     informInputReady();
   }
+
+  @Override
+  public float getProgress()  throws IOException, InterruptedException {
+    return concatenatedMergedKeyValueReader.getProgress();
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/b17edc40/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
index 4a8969e..fa51f47 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
@@ -39,6 +39,8 @@ import org.apache.tez.runtime.library.api.KeyValuesReader;
 @Public
 public class ConcatenatedMergedKeyValuesInput extends MergedLogicalInput {
 
+  private ConcatenatedMergedKeyValuesReader concatenatedMergedKeyValuesReader;
+
   public ConcatenatedMergedKeyValuesInput(MergedInputContext context,
                                           List<Input> inputs) {
     super(context, inputs);
@@ -88,7 +90,10 @@ public class ConcatenatedMergedKeyValuesInput extends MergedLogicalInput {
     public Iterable<Object> getCurrentValues() throws IOException {
       return currentReader.getCurrentValues();
     }
-    
+
+    public float getProgress() {
+      return (1.0f)*(currentReaderIndex + 1)/getInputs().size();
+    }
   }
    
   /**
@@ -97,11 +102,17 @@ public class ConcatenatedMergedKeyValuesInput extends MergedLogicalInput {
    */
   @Override
   public KeyValuesReader getReader() throws Exception {
-    return new ConcatenatedMergedKeyValuesReader();
+    concatenatedMergedKeyValuesReader = new ConcatenatedMergedKeyValuesReader();
+    return concatenatedMergedKeyValuesReader;
   }
 
   @Override
   public void setConstituentInputIsReady(Input input) {
     informInputReady();
   }
+
+  @Override
+  public float getProgress()  throws IOException, InterruptedException {
+    return concatenatedMergedKeyValuesReader.getProgress();
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/b17edc40/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
index 9a2a23e..c86e2fb 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
@@ -86,6 +86,7 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput {
 
   private TezCounter inputKeyCounter;
   private TezCounter inputValueCounter;
+  private TezCounter shuffledInputs;
 
   private final AtomicBoolean isStarted = new AtomicBoolean(false);
 
@@ -114,6 +115,8 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput {
     this.inputKeyCounter = getContext().getCounters().findCounter(TaskCounter.REDUCE_INPUT_GROUPS);
     this.inputValueCounter = getContext().getCounters().findCounter(
         TaskCounter.REDUCE_INPUT_RECORDS);
+     this.shuffledInputs = getContext().getCounters().findCounter(
+        TaskCounter.NUM_SHUFFLED_INPUTS);
     this.conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, getContext().getWorkDirs());
 
     return Collections.emptyList();
@@ -264,6 +267,20 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput {
   }
 
   @Override
+  public float getProgress()  throws IOException, InterruptedException {
+    int totalInputs = getNumPhysicalInputs();
+    if (totalInputs != 0) {
+      synchronized (this) {
+        return ((0.5f) * this.shuffledInputs.getValue() / totalInputs) +
+            ((rawIter != null) ?
+             ((0.5f) * rawIter.getProgress().getProgress()) : 0.0f);
+      }
+    } else {
+      return 0.0f;
+    }
+  }
+
+  @Override
   public void handleEvents(List<Event> inputEvents) throws IOException {
     Shuffle shuffleLocalRef;
     synchronized (this) {

http://git-wip-us.apache.org/repos/asf/tez/blob/b17edc40/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java
index 2345bbb..5d6668d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java
@@ -250,5 +250,11 @@ public class OrderedGroupedMergedKVInput extends MergedLogicalInput {
       }
     }
   }
-
+  public float getProgress() throws IOException, InterruptedException {
+    float totalProgress = 0.0f;
+    for(Input input : getInputs()) {
+      totalProgress += ((OrderedGroupedKVInput)input).getProgress();
+    }
+    return (1.0f) * totalProgress/getInputs().size();
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/b17edc40/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
index ec9a191..f83b9aa 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
@@ -291,4 +291,8 @@ public class UnorderedKVInput extends AbstractLogicalInput {
     return Collections.unmodifiableSet(confKeys);
   }
 
+  @Override
+  public float getProgress() {
+    return kvReader.getProgress();
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/b17edc40/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java
index 725f785..66c3625 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java
@@ -17,17 +17,23 @@
  */
 package org.apache.tez.runtime.library.processor;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
 
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.tez.runtime.api.AbstractLogicalInput;
 import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.Processor;
 import org.apache.tez.runtime.api.ProcessorContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Implements an {@link AbstractLogicalIOProcessor} and provides empty
@@ -38,9 +44,37 @@ import org.apache.tez.runtime.api.ProcessorContext;
 @Public
 @Evolving
 public abstract class SimpleProcessor extends AbstractLogicalIOProcessor {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(AbstractLogicalIOProcessor.class);
   protected Map<String, LogicalInput> inputs;
   protected Map<String, LogicalOutput> outputs;
 
+  Timer progressTimer = new Timer();
+  TimerTask progressTask = new TimerTask() {
+
+    @Override
+    public void run() {
+      try {
+        float progSum = 0.0f;
+        if (getInputs() != null) {
+          for(LogicalInput input : getInputs().values()) {
+            if (input instanceof AbstractLogicalInput) {
+              progSum += ((AbstractLogicalInput) input).getProgress();
+            }
+          }
+          float progress = (1.0f) * progSum / inputs.size();
+          getContext().setProgress(progress);
+        }
+      } catch (IOException e) {
+        LOG.warn("Encountered IOException during Processor progress update"
+            + e.getMessage());
+      } catch (InterruptedException e) {
+        LOG.warn("Encountered InterruptedException during Processor progress"
+            + "update" + e.getMessage());
+      }
+    }
+  };
+
   public SimpleProcessor(ProcessorContext context) {
     super(context);
   }
@@ -72,6 +106,7 @@ public abstract class SimpleProcessor extends AbstractLogicalIOProcessor {
       for (LogicalInput input : getInputs().values()) {
         input.start();
       }
+      progressTimer.schedule(progressTask, 0, 100);
     }
     if (getOutputs() != null) {
       for (LogicalOutput output : getOutputs().values()) {
@@ -101,7 +136,7 @@ public abstract class SimpleProcessor extends AbstractLogicalIOProcessor {
 
   @Override
   public void close() throws Exception {
-
+    progressTimer.cancel();
   }
 
   public Map<String, LogicalInput> getInputs() {
@@ -112,4 +147,7 @@ public abstract class SimpleProcessor extends AbstractLogicalIOProcessor {
     return outputs;
   }
 
+  public Timer getProgressTimer() {
+    return progressTimer;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/b17edc40/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
index 91dcb6d..92bbce8 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
@@ -18,17 +18,21 @@
 
 package org.apache.tez.runtime.library.processor;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
 import java.nio.charset.Charset;
 import java.util.List;
 import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
 
 import com.google.common.base.Charsets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.runtime.api.AbstractLogicalInput;
 import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.LogicalInput;
@@ -47,6 +51,34 @@ public class SleepProcessor extends AbstractLogicalIOProcessor {
   private static final Logger LOG = LoggerFactory.getLogger(SleepProcessor.class);
 
   private int timeToSleepMS;
+  protected Map<String, LogicalInput> inputs;
+  protected Map<String, LogicalOutput> outputs;
+
+  Timer progressTimer = new Timer();
+  TimerTask progressTask = new TimerTask() {
+
+    @Override
+    public void run() {
+      try {
+        float progSum = 0.0f;
+        if (inputs != null) {
+          for(LogicalInput input : inputs.values()) {
+            if (input instanceof AbstractLogicalInput) {
+              progSum += ((AbstractLogicalInput) input).getProgress();
+            }
+          }
+          float progress = (1.0f) * progSum / inputs.size();
+          getContext().setProgress(progress);
+        }
+      } catch (IOException e) {
+        LOG.warn("Encountered IOException during Processor progress update" +
+            e.getMessage());
+      } catch (InterruptedException e) {
+        LOG.warn("Encountered InterruptedException during Processor progress" +
+            "update" + e.getMessage());
+      }
+    }
+  };
 
   public SleepProcessor(ProcessorContext context) {
     super(context);
@@ -69,14 +101,17 @@ public class SleepProcessor extends AbstractLogicalIOProcessor {
   }
 
   @Override
-  public void run(Map<String, LogicalInput> inputs,
-                  Map<String, LogicalOutput> outputs) throws Exception {
+  public void run(Map<String, LogicalInput> _inputs,
+                  Map<String, LogicalOutput> _outputs) throws Exception {
+    inputs = _inputs;
+    outputs = _outputs;
     LOG.info("Running the Sleep Processor, sleeping for "
       + timeToSleepMS + " ms");
-    for (LogicalInput input : inputs.values()) {
+    for (LogicalInput input : _inputs.values()) {
       input.start();
     }
-    for (LogicalOutput output : outputs.values()) {
+    progressTimer.schedule(progressTask, 0, 100);
+    for (LogicalOutput output : _outputs.values()) {
       output.start();
     }
     try {
@@ -93,7 +128,7 @@ public class SleepProcessor extends AbstractLogicalIOProcessor {
 
   @Override
   public void close() throws Exception {
-    // Nothing to cleanup
+    progressTimer.cancel();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/tez/blob/b17edc40/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordInputProcessor.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordInputProcessor.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordInputProcessor.java
index 6103047..513c782 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordInputProcessor.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordInputProcessor.java
@@ -18,8 +18,11 @@
 
 package org.apache.tez.mapreduce.examples.processor;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
 
 import org.apache.tez.runtime.api.TaskFailureType;
 import org.slf4j.Logger;
@@ -33,6 +36,7 @@ import org.apache.tez.mapreduce.examples.FilterLinesByWord.TextLongPair;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.apache.tez.runtime.api.AbstractLogicalInput;
 import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.LogicalInput;
@@ -47,6 +51,33 @@ public class FilterByWordInputProcessor extends AbstractLogicalIOProcessor {
   private static final Logger LOG = LoggerFactory.getLogger(FilterByWordInputProcessor.class);
 
   private String filterWord;
+  protected Map<String, LogicalInput> inputs;
+  protected Map<String, LogicalOutput> outputs;
+  Timer progressTimer = new Timer();
+  TimerTask progressTask = new TimerTask() {
+
+    @Override
+    public void run() {
+      try {
+        float progSum = 0.0f;
+        if (inputs != null) {
+          for(LogicalInput input : inputs.values()) {
+            if (input instanceof AbstractLogicalInput) {
+              progSum += ((AbstractLogicalInput) input).getProgress();
+            }
+          }
+          float progress = (1.0f) * progSum / inputs.size();
+          getContext().setProgress(progress);
+        }
+      } catch (IOException e) {
+        LOG.warn("Encountered IOException during Processor progress update" +
+            e.getMessage());
+      } catch (InterruptedException e) {
+        LOG.warn("Encountered InterruptedException during Processor progress" +
+            "update" + e.getMessage());
+      }
+    }
+  };
 
   public FilterByWordInputProcessor(ProcessorContext context) {
     super(context);
@@ -70,38 +101,40 @@ public class FilterByWordInputProcessor extends AbstractLogicalIOProcessor {
 
   @Override
   public void close() throws Exception {
-    LOG.info("Broadcast Processor closing. Nothing to do");
+    progressTimer.cancel();
   }
 
   @Override
-  public void run(Map<String, LogicalInput> inputs,
-      Map<String, LogicalOutput> outputs) throws Exception {
+  public void run(Map<String, LogicalInput> _inputs,
+      Map<String, LogicalOutput> _outputs) throws Exception {
+    this.inputs = _inputs;
+    this.outputs = _outputs;
     
-    if (inputs.size() != 1) {
+    if (_inputs.size() != 1) {
       throw new IllegalStateException("FilterByWordInputProcessor processor can only work with a single input");
     }
 
-    if (outputs.size() != 1) {
+    if (_outputs.size() != 1) {
       throw new IllegalStateException("FilterByWordInputProcessor processor can only work with a single output");
     }
     
-    for (LogicalInput input : inputs.values()) {
+    for (LogicalInput input : _inputs.values()) {
       input.start();
     }
-    for (LogicalOutput output : outputs.values()) {
+    for (LogicalOutput output : _outputs.values()) {
       output.start();
     }
 
-    LogicalInput li = inputs.values().iterator().next();
+    LogicalInput li = _inputs.values().iterator().next();
     if (! (li instanceof MRInput)) {
       throw new IllegalStateException("FilterByWordInputProcessor processor can only work with MRInput");
     }
 
-    LogicalOutput lo = outputs.values().iterator().next();
+    LogicalOutput lo = _outputs.values().iterator().next();
     if (! (lo instanceof UnorderedKVOutput)) {
       throw new IllegalStateException("FilterByWordInputProcessor processor can only work with OnFileUnorderedKVOutput");
     }
-
+    progressTimer.schedule(progressTask, 0, 100);
     MRInputLegacy mrInput = (MRInputLegacy) li;
     mrInput.init();
     UnorderedKVOutput kvOutput = (UnorderedKVOutput) lo;

http://git-wip-us.apache.org/repos/asf/tez/blob/b17edc40/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordOutputProcessor.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordOutputProcessor.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordOutputProcessor.java
index 15c17fc..5872527 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordOutputProcessor.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordOutputProcessor.java
@@ -51,6 +51,7 @@ public class FilterByWordOutputProcessor extends SimpleMRProcessor {
   @Override
   public void close() throws Exception {
     LOG.info("Broadcast Output Processor closing. Nothing to do");
+    getProgressTimer().cancel();
   }
 
   @Override