You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2016/10/26 18:06:12 UTC

[1/2] tez git commit: TEZ-3317. Speculative execution starts too early due to 0 progress. Contributed by Kuhu Shukla.

Repository: tez
Updated Branches:
  refs/heads/branch-0.8 7f8687f24 -> 85fdc4128


TEZ-3317. Speculative execution starts too early due to 0 progress. Contributed by Kuhu Shukla.


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/0daa1ec3
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/0daa1ec3
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/0daa1ec3

Branch: refs/heads/branch-0.8
Commit: 0daa1ec386edab921f313cd612a86beeb05da629
Parents: 7f8687f
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Oct 26 11:02:04 2016 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Oct 26 11:02:04 2016 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../tez/runtime/api/AbstractLogicalInput.java   |   5 +
 .../tez/runtime/api/MergedLogicalInput.java     |   5 +
 .../org/apache/tez/mapreduce/input/MRInput.java |   5 +-
 .../tez/mapreduce/input/MRInputLegacy.java      |   4 +
 .../mapreduce/processor/map/MapProcessor.java   |  58 ++++++--
 .../processor/reduce/ReduceProcessor.java       |  59 ++++++--
 .../processor/map/TestMapProcessor.java         | 142 +++++++++++++++++++
 .../api/impl/TezProcessorContextImpl.java       |   6 +-
 .../common/readers/UnorderedKVReader.java       |  15 ++
 .../common/shuffle/impl/ShuffleManager.java     |   9 ++
 .../input/ConcatenatedMergedKeyValueInput.java  |  14 +-
 .../input/ConcatenatedMergedKeyValuesInput.java |  15 +-
 .../library/input/OrderedGroupedKVInput.java    |  17 +++
 .../input/OrderedGroupedMergedKVInput.java      |   8 +-
 .../runtime/library/input/UnorderedKVInput.java |   4 +
 .../library/processor/SimpleProcessor.java      |  40 +++++-
 .../library/processor/SleepProcessor.java       |  45 +++++-
 .../processor/FilterByWordInputProcessor.java   |  53 +++++--
 .../processor/FilterByWordOutputProcessor.java  |   1 +
 20 files changed, 457 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/0daa1ec3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fce5e1c..06dcc0f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3317. Speculative execution starts too early due to 0 progress.
   TEZ-3452. Auto-reduce parallelism calculation can overflow with large inputs
   TEZ-3439. Tez joinvalidate fails when first input argument size is bigger than the second.
   TEZ-3464. Fix findbugs warnings in tez-dag mainLoop
@@ -517,6 +518,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3317. Speculative execution starts too early due to 0 progress.
   TEZ-3452. Auto-reduce parallelism calculation can overflow with large inputs
   TEZ-3439. Tez joinvalidate fails when first input argument size is bigger than the second.
   TEZ-3464. Fix findbugs warnings in tez-dag mainLoop

http://git-wip-us.apache.org/repos/asf/tez/blob/0daa1ec3/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java
index dea79b7..4c95eb9 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java
@@ -17,6 +17,7 @@
  */
 package org.apache.tez.runtime.api;
 
+import java.io.IOException;
 import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience.Public;
@@ -80,4 +81,8 @@ public abstract class AbstractLogicalInput implements LogicalInput, LogicalInput
   public final InputContext getContext() {
     return inputContext;
   }
+
+  public float getProgress() throws IOException, InterruptedException {
+    return 0.0f;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/0daa1ec3/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
index dedc902..3195a17 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.runtime.api;
 
+import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -92,4 +93,8 @@ public abstract class MergedLogicalInput implements LogicalInput {
    * Used by the framework to inform the MergedInput that one of it's constituent Inputs is ready.
    */
   public abstract void setConstituentInputIsReady(Input input);
+
+  public float getProgress() throws IOException, InterruptedException {
+    return 0.0f;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/0daa1ec3/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index af4b05c..b83d1a3 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -611,8 +611,9 @@ public class MRInput extends MRInputBase {
     }
   }
 
-  public float getProgress() throws IOException, InterruptedException {
-    return mrReader.getProgress();
+  @Override
+  public float getProgress() throws IOException,InterruptedException {
+    return (mrReader != null) ? mrReader.getProgress() : 0.0f;
   }
 
   void processSplitEvent(InputDataInformationEvent event)

http://git-wip-us.apache.org/repos/asf/tez/blob/0daa1ec3/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
index e83c36a..9b5ed1c 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
@@ -87,6 +87,10 @@ public class MRInputLegacy extends MRInput {
     return (org.apache.hadoop.mapreduce.RecordReader) mrReader.getRecordReader();
   }
 
+  public float getProgress() throws IOException, InterruptedException {
+      return super.getProgress();
+  }
+
   @Private
   public InputSplit getOldInputSplit() {
     return (InputSplit) mrReader.getSplit();

http://git-wip-us.apache.org/repos/asf/tez/blob/0daa1ec3/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
index 5ef0d2b..ecdacf9 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
@@ -21,6 +21,8 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Timer;
+import java.util.TimerTask;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,6 +46,7 @@ import org.apache.tez.mapreduce.input.MRInputLegacy;
 import org.apache.tez.mapreduce.output.MROutputLegacy;
 import org.apache.tez.mapreduce.processor.MRTask;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
+import org.apache.tez.runtime.api.AbstractLogicalInput;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
@@ -58,6 +61,35 @@ public class MapProcessor extends MRTask{
 
   private static final Logger LOG = LoggerFactory.getLogger(MapProcessor.class);
 
+  protected Map<String, LogicalInput> inputs;
+  protected Map<String, LogicalOutput> outputs;
+  Timer progressTimer = new Timer();
+  TimerTask progressTask = new TimerTask() {
+
+    @Override
+    public void run() {
+      try {
+        float progSum = 0.0f;
+        if (inputs != null && inputs.size() != 0) {
+          for(LogicalInput input : inputs.values()) {
+            if (input instanceof AbstractLogicalInput) {
+              progSum += ((AbstractLogicalInput) input).getProgress();
+            }
+          }
+          float progress = (1.0f) * progSum / inputs.size();
+          getContext().setProgress(progress);
+          mrReporter.setProgress(progress);
+        }
+      } catch (IOException e) {
+        LOG.warn("Encountered IOException during Processor progress update"
+            + e.getMessage());
+      } catch (InterruptedException e) {
+        LOG.warn("Encountered InterruptedException during Processor progress"
+            + "update" + e.getMessage());
+      }
+    }
+  };
+
   public MapProcessor(ProcessorContext processorContext) {
     super(processorContext, true);
   }
@@ -69,34 +101,36 @@ public class MapProcessor extends MRTask{
   }
 
   public void close() throws IOException {
-    // TODO Auto-generated method stub
+    progressTimer.cancel();
 
   }
 
   @Override
-  public void run(Map<String, LogicalInput> inputs,
-      Map<String, LogicalOutput> outputs) throws Exception {
-
+  public void run(Map<String, LogicalInput> _inputs,
+      Map<String, LogicalOutput> _outputs) throws Exception {
+    this.inputs = _inputs;
+    this.outputs = _outputs;
     LOG.info("Running map: " + processorContext.getUniqueIdentifier());
 
-    if (inputs.size() != 1
-           || outputs.size() != 1) {
+    if (_inputs.size() != 1
+           || _outputs.size() != 1) {
       throw new IOException("Cannot handle multiple _inputs or _outputs"
-              + ", inputCount=" + inputs.size()
-              + ", outputCount=" + outputs.size());
+              + ", inputCount=" + _inputs.size()
+              + ", outputCount=" + _outputs.size());
     }
 
-    for (LogicalInput input : inputs.values()) {
+    for (LogicalInput input : _inputs.values()) {
       input.start();
     }
-    for (LogicalOutput output : outputs.values()) {
+    for (LogicalOutput output : _outputs.values()) {
       output.start();
     }
 
-    LogicalInput in = inputs.values().iterator().next();
-    LogicalOutput out = outputs.values().iterator().next();
+    LogicalInput in = _inputs.values().iterator().next();
+    LogicalOutput out = _outputs.values().iterator().next();
 
     initTask(out);
+    progressTimer.schedule(progressTask, 0, 100);
 
     // Sanity check
     if (!(in instanceof MRInputLegacy)) {

http://git-wip-us.apache.org/repos/asf/tez/blob/0daa1ec3/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
index 996cf84..8ec6091 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
@@ -22,6 +22,8 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,6 +44,7 @@ import org.apache.tez.dag.api.TezException;
 import org.apache.tez.mapreduce.output.MROutputLegacy;
 import org.apache.tez.mapreduce.processor.MRTask;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
+import org.apache.tez.runtime.api.AbstractLogicalInput;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.LogicalInput;
@@ -63,6 +66,35 @@ public class ReduceProcessor extends MRTask {
   private Counter reduceInputKeyCounter;
   private Counter reduceInputValueCounter;
 
+  protected Map<String, LogicalInput> inputs;
+  protected Map<String, LogicalOutput> outputs;
+  Timer progressTimer = new Timer();
+  TimerTask progressTask = new TimerTask() {
+
+    @Override
+    public void run() {
+      try {
+        float progSum = 0.0f;
+        if (inputs != null && inputs.size() != 0) {
+          for(LogicalInput input : inputs.values()) {
+            if (input instanceof AbstractLogicalInput) {
+              progSum += ((AbstractLogicalInput) input).getProgress();
+            }
+          }
+          float progress = (1.0f) * progSum / inputs.size();
+          getContext().setProgress(progress);
+          mrReporter.setProgress(progress);
+        }
+      } catch (IOException e) {
+        LOG.warn("Encountered IOException during Processor progress update"
+            + e.getMessage());
+      } catch (InterruptedException e) {
+        LOG.warn("Encountered InterruptedException during Processor progress"
+            + "update" + e.getMessage());
+      }
+    }
+  };
+
   public ReduceProcessor(ProcessorContext processorContext) {
     super(processorContext, false);
   }
@@ -74,27 +106,28 @@ public class ReduceProcessor extends MRTask {
   }
 
   public void close() throws IOException {
-    // TODO Auto-generated method stub
+    progressTimer.cancel();
 
   }
 
   @Override
-  public void run(Map<String, LogicalInput> inputs,
-      Map<String, LogicalOutput> outputs) throws Exception {
-
+  public void run(Map<String, LogicalInput> _inputs,
+      Map<String, LogicalOutput> _outputs) throws Exception {
+    this.inputs = _inputs;
+    this.outputs = _outputs;
     LOG.info("Running reduce: " + processorContext.getUniqueIdentifier());
 
-    if (outputs.size() <= 0 || outputs.size() > 1) {
-      throw new IOException("Invalid number of outputs"
-          + ", outputCount=" + outputs.size());
+    if (_outputs.size() <= 0 || _outputs.size() > 1) {
+      throw new IOException("Invalid number of _outputs"
+          + ", outputCount=" + _outputs.size());
     }
 
-    if (inputs.size() <= 0 || inputs.size() > 1) {
-      throw new IOException("Invalid number of inputs"
-          + ", inputCount=" + inputs.size());
+    if (_inputs.size() <= 0 || _inputs.size() > 1) {
+      throw new IOException("Invalid number of _inputs"
+          + ", inputCount=" + _inputs.size());
     }
 
-    LogicalInput in = inputs.values().iterator().next();
+    LogicalInput in = _inputs.values().iterator().next();
     in.start();
 
     List<Input> pendingInputs = new LinkedList<Input>();
@@ -102,11 +135,11 @@ public class ReduceProcessor extends MRTask {
     processorContext.waitForAllInputsReady(pendingInputs);
     LOG.info("Input is ready for consumption. Starting Output");
 
-    LogicalOutput out = outputs.values().iterator().next();
+    LogicalOutput out = _outputs.values().iterator().next();
     out.start();
 
     initTask(out);
-
+    progressTimer.schedule(progressTask, 0, 100);
     this.statusUpdate();
 
     Class keyClass = ConfigUtils.getIntermediateInputKeyClass(jobConf);

http://git-wip-us.apache.org/repos/asf/tez/blob/0daa1ec3/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
index 70f8763..53b8c46 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
@@ -21,7 +21,22 @@ package org.apache.tez.mapreduce.processor.map;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collections;
+import java.util.Random;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.split.JobSplit;
+import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez;
+import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -69,6 +84,9 @@ public class TestMapProcessor {
   private static JobConf defaultConf = new JobConf();
   private static FileSystem localFs = null; 
   private static Path workDir = null;
+  static float progressUpdate = 0.0f;
+  final private static FsPermission JOB_FILE_PERMISSION = FsPermission
+      .createImmutable((short) 0644);
   static {
     try {
       defaultConf.set("fs.defaultFS", "file:///");
@@ -184,4 +202,128 @@ public class TestMapProcessor {
     }
     reader.close();
   }
+
+  @Test(timeout = 10000)
+  public void testMapProcessorProgress() throws Exception {
+    String dagName = "mrdag0";
+    String vertexName = MultiStageMRConfigUtil.getInitialMapVertexName();
+    JobConf jobConf = new JobConf(defaultConf);
+    setUpJobConf(jobConf);
+
+    MRHelpers.translateMRConfToTez(jobConf);
+    jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
+
+    jobConf.setBoolean(MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS, false);
+
+    jobConf.set(MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR, new Path(workDir,
+        "localized-resources").toUri().toString());
+
+    Path mapInput = new Path(workDir, "map0");
+
+
+    generateInputSplit(localFs, workDir, jobConf, mapInput);
+
+    InputSpec mapInputSpec = new InputSpec("NullSrcVertex",
+        InputDescriptor.create(MRInputLegacy.class.getName())
+            .setUserPayload(UserPayload.create(ByteBuffer.wrap(
+                MRRuntimeProtos.MRInputUserPayloadProto.newBuilder()
+                    .setConfigurationBytes(TezUtils.createByteStringFromConf
+                        (jobConf)).build()
+                    .toByteArray()))),
+        1);
+    OutputSpec mapOutputSpec = new OutputSpec("NullDestVertex",
+        OutputDescriptor.create(OrderedPartitionedKVOutput.class.getName())
+            .setUserPayload(TezUtils.createUserPayloadFromConf(jobConf)), 1);
+
+    final LogicalIOProcessorRuntimeTask task = MapUtils.createLogicalTask
+        (localFs, workDir, jobConf, 0,
+            new Path(workDir, "map0"), new TestUmbilical(), dagName, vertexName,
+            Collections.singletonList(mapInputSpec),
+            Collections.singletonList(mapOutputSpec));
+
+    ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
+    Thread monitorProgress = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        float prog = task.getProgress();
+        if(prog > 0.0 && prog < 1.0)
+          progressUpdate = prog;
+      }
+    });
+
+    task.initialize();
+    scheduler.scheduleAtFixedRate(monitorProgress, 0, 10,
+        TimeUnit.MILLISECONDS);
+    task.run();
+    Assert.assertTrue("Progress Updates should be captured!",
+        progressUpdate != 0.0f);
+    task.close();
+  }
+
+  public static void generateInputSplit(FileSystem fs, Path workDir,
+                                        JobConf jobConf, Path mapInput)
+      throws IOException {
+    jobConf.setInputFormat(SequenceFileInputFormat.class);
+    FileInputFormat.setInputPaths(jobConf, workDir);
+
+    LOG.info("Generating data at path: " + mapInput);
+    // create a file with length entries
+    SequenceFile.Writer writer =
+        SequenceFile.createWriter(fs, jobConf, mapInput,
+            LongWritable.class, Text.class);
+    try {
+      Random r = new Random(System.currentTimeMillis());
+      LongWritable key = new LongWritable();
+      Text value = new Text();
+      for (int i = 100000; i > 0; i--) {
+        key.set(r.nextInt(1000));
+        value.set(Integer.toString(i));
+        writer.append(key, value);
+        LOG.info("<k, v> : <" + key.get() + ", " + value + ">");
+      }
+    } finally {
+      writer.close();
+    }
+
+    SequenceFileInputFormat<LongWritable, Text> format =
+        new SequenceFileInputFormat<LongWritable, Text>();
+    InputSplit[] splits = format.getSplits(jobConf, 1);
+    System.err.println("#split = " + splits.length + " ; " +
+        "#locs = " + splits[0].getLocations().length + "; " +
+        "loc = " + splits[0].getLocations()[0] + "; " +
+        "off = " + splits[0].getLength() + "; " +
+        "file = " + ((FileSplit)splits[0]).getPath());
+    writeSplitFiles(fs, jobConf, splits[0]);
+  }
+
+  private static void writeSplitFiles(FileSystem fs, JobConf conf,
+                                      InputSplit split) throws IOException {
+    Path jobSplitFile = new Path(conf.get(MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR,
+        MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR_DEFAULT), MRJobConfig.JOB_SPLIT);
+    LOG.info("Writing split to: " + jobSplitFile);
+    FSDataOutputStream out = FileSystem.create(fs, jobSplitFile,
+        new FsPermission(JOB_FILE_PERMISSION));
+
+    long offset = out.getPos();
+    Text.writeString(out, split.getClass().getName());
+    split.write(out);
+    out.close();
+
+    String[] locations = split.getLocations();
+
+    JobSplit.SplitMetaInfo info = null;
+    info = new JobSplit.SplitMetaInfo(locations, offset, split.getLength());
+
+    Path jobSplitMetaInfoFile = new Path(
+        conf.get(MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR),
+        MRJobConfig.JOB_SPLIT_METAINFO);
+
+    FSDataOutputStream outMeta = FileSystem.create(fs, jobSplitMetaInfoFile,
+        new FsPermission(JOB_FILE_PERMISSION));
+    outMeta.write(SplitMetaInfoReaderTez.META_SPLIT_FILE_HEADER);
+    WritableUtils.writeVInt(outMeta, SplitMetaInfoReaderTez.META_SPLIT_VERSION);
+    WritableUtils.writeVInt(outMeta, 1); // Only 1 split meta info being written
+    info.write(outMeta);
+    outMeta.close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/0daa1ec3/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
index d7c2d3e..c00b977 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
@@ -92,8 +92,10 @@ public class TezProcessorContextImpl extends TezTaskContextImpl implements Proce
 
   @Override
   public void setProgress(float progress) {
-    runtimeTask.setProgress(progress);
-    notifyProgress();
+    if (runtimeTask.getProgress() != progress) {
+      runtimeTask.setProgress(progress);
+      notifyProgress();
+    }
   }
 
   @SuppressWarnings("deprecation")

http://git-wip-us.apache.org/repos/asf/tez/blob/0daa1ec3/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
index 57bb121..3f44c4f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
@@ -73,6 +73,8 @@ public class UnorderedKVReader<K, V> extends KeyValueReader {
   // TODO Remove this once per I/O counters are separated properly. Relying on
   // the counter at the moment will generate aggregate numbers. 
   private int numRecordsRead = 0;
+  private long totalBytesRead = 0;
+  private long totalFileBytes = 0;
 
 
   public UnorderedKVReader(ShuffleManager shuffleManager, Configuration conf,
@@ -146,6 +148,17 @@ public class UnorderedKVReader<K, V> extends KeyValueReader {
     return value;
   }
 
+  public float getProgress() {
+    int numInputs = shuffleManager.getNumInputs();
+    if (totalFileBytes > 0 && numInputs > 0) {
+      return ((1.0f) * (totalBytesRead + ((currentReader != null) ? currentReader.bytesRead :
+      0l)) /
+          totalFileBytes) * (
+          shuffleManager.getNumCompletedInputs().floatValue() /
+              (1.0f * numInputs));
+    }
+    return 0l;
+  }
   /**
    * Tries reading the next key and value from the current reader.
    * @return true if the current reader has more records
@@ -176,6 +189,7 @@ public class UnorderedKVReader<K, V> extends KeyValueReader {
    */
   private boolean moveToNextInput() throws IOException {
     if (currentReader != null) { // Close the current reader.
+      totalBytesRead += currentReader.bytesRead;
       currentReader.close();
       /**
        * clear reader explicitly. Otherwise this could point to stale reference when next() is
@@ -196,6 +210,7 @@ public class UnorderedKVReader<K, V> extends KeyValueReader {
       return false; // No more inputs
     } else {
       currentReader = openIFileReader(currentFetchedInput);
+      totalFileBytes += currentReader.getLength();
       return true;
     }
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/0daa1ec3/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index 4f7d348..4702e23 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -862,6 +862,15 @@ public class ShuffleManager implements FetcherCallback {
     } while (input instanceof NullFetchedInput);
     return input;
   }
+
+  public int getNumInputs() {
+    return numInputs;
+  }
+
+  public AtomicInteger getNumCompletedInputs() {
+    return numCompletedInputs;
+  }
+
   /////////////////// End of methods for walking the available inputs
 
 

http://git-wip-us.apache.org/repos/asf/tez/blob/0daa1ec3/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
index 0b8ed21..743b628 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
@@ -37,6 +37,7 @@ import org.apache.tez.runtime.library.api.KeyValueReader;
  */
 @Public
 public class ConcatenatedMergedKeyValueInput extends MergedLogicalInput {
+  private ConcatenatedMergedKeyValueReader concatenatedMergedKeyValueReader;
 
   public ConcatenatedMergedKeyValueInput(MergedInputContext context,
                                          List<Input> inputs) {
@@ -87,7 +88,10 @@ public class ConcatenatedMergedKeyValueInput extends MergedLogicalInput {
     public Object getCurrentValue() throws IOException {
       return currentReader.getCurrentValue();
     }
-    
+
+    public float getProgress() {
+      return (1.0f)*(currentReaderIndex + 1)/getInputs().size();
+    }
   }
 
   /**
@@ -96,11 +100,17 @@ public class ConcatenatedMergedKeyValueInput extends MergedLogicalInput {
    */
   @Override
   public KeyValueReader getReader() throws Exception {
-    return new ConcatenatedMergedKeyValueReader();
+    concatenatedMergedKeyValueReader = new ConcatenatedMergedKeyValueReader();
+    return concatenatedMergedKeyValueReader;
   }
 
   @Override
   public void setConstituentInputIsReady(Input input) {
     informInputReady();
   }
+
+  @Override
+  public float getProgress()  throws IOException, InterruptedException {
+    return concatenatedMergedKeyValueReader.getProgress();
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/0daa1ec3/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
index 4a8969e..fa51f47 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
@@ -39,6 +39,8 @@ import org.apache.tez.runtime.library.api.KeyValuesReader;
 @Public
 public class ConcatenatedMergedKeyValuesInput extends MergedLogicalInput {
 
+  private ConcatenatedMergedKeyValuesReader concatenatedMergedKeyValuesReader;
+
   public ConcatenatedMergedKeyValuesInput(MergedInputContext context,
                                           List<Input> inputs) {
     super(context, inputs);
@@ -88,7 +90,10 @@ public class ConcatenatedMergedKeyValuesInput extends MergedLogicalInput {
     public Iterable<Object> getCurrentValues() throws IOException {
       return currentReader.getCurrentValues();
     }
-    
+
+    public float getProgress() {
+      return (1.0f)*(currentReaderIndex + 1)/getInputs().size();
+    }
   }
    
   /**
@@ -97,11 +102,17 @@ public class ConcatenatedMergedKeyValuesInput extends MergedLogicalInput {
    */
   @Override
   public KeyValuesReader getReader() throws Exception {
-    return new ConcatenatedMergedKeyValuesReader();
+    concatenatedMergedKeyValuesReader = new ConcatenatedMergedKeyValuesReader();
+    return concatenatedMergedKeyValuesReader;
   }
 
   @Override
   public void setConstituentInputIsReady(Input input) {
     informInputReady();
   }
+
+  @Override
+  public float getProgress()  throws IOException, InterruptedException {
+    return concatenatedMergedKeyValuesReader.getProgress();
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/0daa1ec3/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
index 9a2a23e..c86e2fb 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
@@ -86,6 +86,7 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput {
 
   private TezCounter inputKeyCounter;
   private TezCounter inputValueCounter;
+  private TezCounter shuffledInputs;
 
   private final AtomicBoolean isStarted = new AtomicBoolean(false);
 
@@ -114,6 +115,8 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput {
     this.inputKeyCounter = getContext().getCounters().findCounter(TaskCounter.REDUCE_INPUT_GROUPS);
     this.inputValueCounter = getContext().getCounters().findCounter(
         TaskCounter.REDUCE_INPUT_RECORDS);
+     this.shuffledInputs = getContext().getCounters().findCounter(
+        TaskCounter.NUM_SHUFFLED_INPUTS);
     this.conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, getContext().getWorkDirs());
 
     return Collections.emptyList();
@@ -264,6 +267,20 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput {
   }
 
   @Override
+  public float getProgress()  throws IOException, InterruptedException {
+    int totalInputs = getNumPhysicalInputs();
+    if (totalInputs != 0) {
+      synchronized (this) {
+        return ((0.5f) * this.shuffledInputs.getValue() / totalInputs) +
+            ((rawIter != null) ?
+             ((0.5f) * rawIter.getProgress().getProgress()) : 0.0f);
+      }
+    } else {
+      return 0.0f;
+    }
+  }
+
+  @Override
   public void handleEvents(List<Event> inputEvents) throws IOException {
     Shuffle shuffleLocalRef;
     synchronized (this) {

http://git-wip-us.apache.org/repos/asf/tez/blob/0daa1ec3/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java
index 2345bbb..5d6668d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java
@@ -250,5 +250,11 @@ public class OrderedGroupedMergedKVInput extends MergedLogicalInput {
       }
     }
   }
-
+  public float getProgress() throws IOException, InterruptedException {
+    float totalProgress = 0.0f;
+    for(Input input : getInputs()) {
+      totalProgress += ((OrderedGroupedKVInput)input).getProgress();
+    }
+    return (1.0f) * totalProgress/getInputs().size();
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/0daa1ec3/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
index ec9a191..f83b9aa 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
@@ -291,4 +291,8 @@ public class UnorderedKVInput extends AbstractLogicalInput {
     return Collections.unmodifiableSet(confKeys);
   }
 
+  @Override
+  public float getProgress() {
+    return kvReader.getProgress();
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/0daa1ec3/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java
index 725f785..66c3625 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java
@@ -17,17 +17,23 @@
  */
 package org.apache.tez.runtime.library.processor;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
 
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.tez.runtime.api.AbstractLogicalInput;
 import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.Processor;
 import org.apache.tez.runtime.api.ProcessorContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Implements an {@link AbstractLogicalIOProcessor} and provides empty
@@ -38,9 +44,37 @@ import org.apache.tez.runtime.api.ProcessorContext;
 @Public
 @Evolving
 public abstract class SimpleProcessor extends AbstractLogicalIOProcessor {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(AbstractLogicalIOProcessor.class);
   protected Map<String, LogicalInput> inputs;
   protected Map<String, LogicalOutput> outputs;
 
+  Timer progressTimer = new Timer();
+  TimerTask progressTask = new TimerTask() {
+
+    @Override
+    public void run() {
+      try {
+        float progSum = 0.0f;
+        if (getInputs() != null) {
+          for(LogicalInput input : getInputs().values()) {
+            if (input instanceof AbstractLogicalInput) {
+              progSum += ((AbstractLogicalInput) input).getProgress();
+            }
+          }
+          float progress = (1.0f) * progSum / inputs.size();
+          getContext().setProgress(progress);
+        }
+      } catch (IOException e) {
+        LOG.warn("Encountered IOException during Processor progress update"
+            + e.getMessage());
+      } catch (InterruptedException e) {
+        LOG.warn("Encountered InterruptedException during Processor progress"
+            + "update" + e.getMessage());
+      }
+    }
+  };
+
   public SimpleProcessor(ProcessorContext context) {
     super(context);
   }
@@ -72,6 +106,7 @@ public abstract class SimpleProcessor extends AbstractLogicalIOProcessor {
       for (LogicalInput input : getInputs().values()) {
         input.start();
       }
+      progressTimer.schedule(progressTask, 0, 100);
     }
     if (getOutputs() != null) {
       for (LogicalOutput output : getOutputs().values()) {
@@ -101,7 +136,7 @@ public abstract class SimpleProcessor extends AbstractLogicalIOProcessor {
 
   @Override
   public void close() throws Exception {
-
+    progressTimer.cancel();
   }
 
   public Map<String, LogicalInput> getInputs() {
@@ -112,4 +147,7 @@ public abstract class SimpleProcessor extends AbstractLogicalIOProcessor {
     return outputs;
   }
 
+  public Timer getProgressTimer() {
+    return progressTimer;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/0daa1ec3/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
index 91dcb6d..92bbce8 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
@@ -18,17 +18,21 @@
 
 package org.apache.tez.runtime.library.processor;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
 import java.nio.charset.Charset;
 import java.util.List;
 import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
 
 import com.google.common.base.Charsets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.runtime.api.AbstractLogicalInput;
 import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.LogicalInput;
@@ -47,6 +51,34 @@ public class SleepProcessor extends AbstractLogicalIOProcessor {
   private static final Logger LOG = LoggerFactory.getLogger(SleepProcessor.class);
 
   private int timeToSleepMS;
+  protected Map<String, LogicalInput> inputs;
+  protected Map<String, LogicalOutput> outputs;
+
+  Timer progressTimer = new Timer();
+  TimerTask progressTask = new TimerTask() {
+
+    @Override
+    public void run() {
+      try {
+        float progSum = 0.0f;
+        if (inputs != null) {
+          for(LogicalInput input : inputs.values()) {
+            if (input instanceof AbstractLogicalInput) {
+              progSum += ((AbstractLogicalInput) input).getProgress();
+            }
+          }
+          float progress = (1.0f) * progSum / inputs.size();
+          getContext().setProgress(progress);
+        }
+      } catch (IOException e) {
+        LOG.warn("Encountered IOException during Processor progress update" +
+            e.getMessage());
+      } catch (InterruptedException e) {
+        LOG.warn("Encountered InterruptedException during Processor progress" +
+            "update" + e.getMessage());
+      }
+    }
+  };
 
   public SleepProcessor(ProcessorContext context) {
     super(context);
@@ -69,14 +101,17 @@ public class SleepProcessor extends AbstractLogicalIOProcessor {
   }
 
   @Override
-  public void run(Map<String, LogicalInput> inputs,
-                  Map<String, LogicalOutput> outputs) throws Exception {
+  public void run(Map<String, LogicalInput> _inputs,
+                  Map<String, LogicalOutput> _outputs) throws Exception {
+    inputs = _inputs;
+    outputs = _outputs;
     LOG.info("Running the Sleep Processor, sleeping for "
       + timeToSleepMS + " ms");
-    for (LogicalInput input : inputs.values()) {
+    for (LogicalInput input : _inputs.values()) {
       input.start();
     }
-    for (LogicalOutput output : outputs.values()) {
+    progressTimer.schedule(progressTask, 0, 100);
+    for (LogicalOutput output : _outputs.values()) {
       output.start();
     }
     try {
@@ -93,7 +128,7 @@ public class SleepProcessor extends AbstractLogicalIOProcessor {
 
   @Override
   public void close() throws Exception {
-    // Nothing to cleanup
+    progressTimer.cancel();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/tez/blob/0daa1ec3/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordInputProcessor.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordInputProcessor.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordInputProcessor.java
index 6103047..513c782 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordInputProcessor.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordInputProcessor.java
@@ -18,8 +18,11 @@
 
 package org.apache.tez.mapreduce.examples.processor;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
 
 import org.apache.tez.runtime.api.TaskFailureType;
 import org.slf4j.Logger;
@@ -33,6 +36,7 @@ import org.apache.tez.mapreduce.examples.FilterLinesByWord.TextLongPair;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.apache.tez.runtime.api.AbstractLogicalInput;
 import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.LogicalInput;
@@ -47,6 +51,33 @@ public class FilterByWordInputProcessor extends AbstractLogicalIOProcessor {
   private static final Logger LOG = LoggerFactory.getLogger(FilterByWordInputProcessor.class);
 
   private String filterWord;
+  protected Map<String, LogicalInput> inputs;
+  protected Map<String, LogicalOutput> outputs;
+  Timer progressTimer = new Timer();
+  TimerTask progressTask = new TimerTask() {
+
+    @Override
+    public void run() {
+      try {
+        float progSum = 0.0f;
+        if (inputs != null) {
+          for(LogicalInput input : inputs.values()) {
+            if (input instanceof AbstractLogicalInput) {
+              progSum += ((AbstractLogicalInput) input).getProgress();
+            }
+          }
+          float progress = (1.0f) * progSum / inputs.size();
+          getContext().setProgress(progress);
+        }
+      } catch (IOException e) {
+        LOG.warn("Encountered IOException during Processor progress update" +
+            e.getMessage());
+      } catch (InterruptedException e) {
+        LOG.warn("Encountered InterruptedException during Processor progress" +
+            "update" + e.getMessage());
+      }
+    }
+  };
 
   public FilterByWordInputProcessor(ProcessorContext context) {
     super(context);
@@ -70,38 +101,40 @@ public class FilterByWordInputProcessor extends AbstractLogicalIOProcessor {
 
   @Override
   public void close() throws Exception {
-    LOG.info("Broadcast Processor closing. Nothing to do");
+    progressTimer.cancel();
   }
 
   @Override
-  public void run(Map<String, LogicalInput> inputs,
-      Map<String, LogicalOutput> outputs) throws Exception {
+  public void run(Map<String, LogicalInput> _inputs,
+      Map<String, LogicalOutput> _outputs) throws Exception {
+    this.inputs = _inputs;
+    this.outputs = _outputs;
     
-    if (inputs.size() != 1) {
+    if (_inputs.size() != 1) {
       throw new IllegalStateException("FilterByWordInputProcessor processor can only work with a single input");
     }
 
-    if (outputs.size() != 1) {
+    if (_outputs.size() != 1) {
       throw new IllegalStateException("FilterByWordInputProcessor processor can only work with a single output");
     }
     
-    for (LogicalInput input : inputs.values()) {
+    for (LogicalInput input : _inputs.values()) {
       input.start();
     }
-    for (LogicalOutput output : outputs.values()) {
+    for (LogicalOutput output : _outputs.values()) {
       output.start();
     }
 
-    LogicalInput li = inputs.values().iterator().next();
+    LogicalInput li = _inputs.values().iterator().next();
     if (! (li instanceof MRInput)) {
       throw new IllegalStateException("FilterByWordInputProcessor processor can only work with MRInput");
     }
 
-    LogicalOutput lo = outputs.values().iterator().next();
+    LogicalOutput lo = _outputs.values().iterator().next();
     if (! (lo instanceof UnorderedKVOutput)) {
       throw new IllegalStateException("FilterByWordInputProcessor processor can only work with OnFileUnorderedKVOutput");
     }
-
+    progressTimer.schedule(progressTask, 0, 100);
     MRInputLegacy mrInput = (MRInputLegacy) li;
     mrInput.init();
     UnorderedKVOutput kvOutput = (UnorderedKVOutput) lo;

http://git-wip-us.apache.org/repos/asf/tez/blob/0daa1ec3/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordOutputProcessor.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordOutputProcessor.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordOutputProcessor.java
index 15c17fc..5872527 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordOutputProcessor.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordOutputProcessor.java
@@ -51,6 +51,7 @@ public class FilterByWordOutputProcessor extends SimpleMRProcessor {
   @Override
   public void close() throws Exception {
     LOG.info("Broadcast Output Processor closing. Nothing to do");
+    getProgressTimer().cancel();
   }
 
   @Override


[2/2] tez git commit: TEZ-3437. Improve synchronization and the progress report behavior for Inputs from TEZ-3317. Contributed by Kuhu Shukla.

Posted by ss...@apache.org.
TEZ-3437. Improve synchronization and the progress report behavior for
Inputs from TEZ-3317. Contributed by Kuhu Shukla.


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/85fdc412
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/85fdc412
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/85fdc412

Branch: refs/heads/branch-0.8
Commit: 85fdc4128e87ba7b803e79a8644b58b8d8de59a3
Parents: 0daa1ec
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Oct 26 11:03:14 2016 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Oct 26 11:03:14 2016 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../org/apache/tez/common/ProgressHelper.java   | 89 ++++++++++++++++++++
 .../tez/runtime/api/AbstractLogicalInput.java   |  2 +-
 .../tez/runtime/api/MergedLogicalInput.java     |  2 +-
 .../runtime/api/ProgressFailedException.java    | 46 ++++++++++
 .../org/apache/tez/mapreduce/input/MRInput.java |  9 +-
 .../tez/mapreduce/input/MRInputLegacy.java      |  3 +-
 .../mapreduce/processor/map/MapProcessor.java   | 54 +++++-------
 .../processor/reduce/ReduceProcessor.java       | 38 ++-------
 .../tez/mapreduce/processor/MapUtils.java       |  9 +-
 .../processor/map/TestMapProcessor.java         | 78 ++---------------
 .../processor/reduce/TestReduceProcessor.java   |  2 +-
 .../api/impl/TezProcessorContextImpl.java       |  2 +-
 .../common/readers/UnorderedKVReader.java       | 26 +++---
 .../common/shuffle/impl/ShuffleManager.java     |  4 +-
 .../input/ConcatenatedMergedKeyValueInput.java  | 11 ++-
 .../input/ConcatenatedMergedKeyValuesInput.java | 11 ++-
 .../library/input/OrderedGroupedKVInput.java    |  3 +-
 .../input/OrderedGroupedMergedKVInput.java      |  3 +-
 .../runtime/library/input/UnorderedKVInput.java |  9 +-
 .../library/processor/SimpleProcessor.java      | 42 ++-------
 .../library/processor/SleepProcessor.java       | 39 ++-------
 .../processor/FilterByWordInputProcessor.java   | 39 ++-------
 .../processor/FilterByWordOutputProcessor.java  |  3 +-
 24 files changed, 254 insertions(+), 272 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/85fdc412/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 06dcc0f..5c71975 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3437. Improve synchronization and the progress report behavior for Inputs from TEZ-3317.
   TEZ-3317. Speculative execution starts too early due to 0 progress.
   TEZ-3452. Auto-reduce parallelism calculation can overflow with large inputs
   TEZ-3439. Tez joinvalidate fails when first input argument size is bigger than the second.
@@ -518,6 +519,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3437. Improve synchronization and the progress report behavior for Inputs from TEZ-3317.
   TEZ-3317. Speculative execution starts too early due to 0 progress.
   TEZ-3452. Auto-reduce parallelism calculation can overflow with large inputs
   TEZ-3439. Tez joinvalidate fails when first input argument size is bigger than the second.

http://git-wip-us.apache.org/repos/asf/tez/blob/85fdc412/tez-api/src/main/java/org/apache/tez/common/ProgressHelper.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/ProgressHelper.java b/tez-api/src/main/java/org/apache/tez/common/ProgressHelper.java
new file mode 100644
index 0000000..407a20e
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/common/ProgressHelper.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.tez.runtime.api.AbstractLogicalInput;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.api.ProgressFailedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class ProgressHelper {
+  private static final Logger LOG = LoggerFactory.getLogger(ProgressHelper.class);
+  private String processorName;
+  protected final Map<String, LogicalInput> inputs;
+  final ProcessorContext processorContext;
+
+  volatile ScheduledExecutorService scheduledExecutorService;
+  Runnable monitorProgress = new Runnable() {
+    @Override
+    public void run() {
+      try {
+        float progSum = 0.0f;
+        float progress;
+        if (inputs != null && inputs.size() != 0) {
+          for (LogicalInput input : inputs.values()) {
+            if (input instanceof AbstractLogicalInput) {
+              progSum += ((AbstractLogicalInput) input).getProgress();
+            }
+          }
+          progress = (1.0f) * progSum / inputs.size();
+        } else {
+          progress = 1.0f;
+        }
+        processorContext.setProgress(progress);
+      } catch (ProgressFailedException pe) {
+        LOG.warn("Encountered ProgressFailedException during Processor progress update"
+            + pe);
+      } catch (InterruptedException ie) {
+        LOG.warn("Encountered InterruptedException during Processor progress update"
+            + ie);
+      }
+    }
+  };
+
+  public ProgressHelper(Map<String, LogicalInput> _inputs, ProcessorContext context, String processorName) {
+    this.inputs = _inputs;
+    this.processorContext = context;
+    this.processorName = processorName;
+  }
+
+  public void scheduleProgressTaskService(long delay, long period) {
+    scheduledExecutorService = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
+        .setDaemon(true).setNameFormat("TaskProgressService{" + processorName+ ":" + processorContext.getTaskVertexName()
+            + "} #%d").build());
+    scheduledExecutorService.scheduleWithFixedDelay(monitorProgress, delay, period,
+        TimeUnit.MILLISECONDS);
+  }
+
+  public void shutDownProgressTaskService() {
+    if (scheduledExecutorService != null) {
+      scheduledExecutorService.shutdownNow();
+      scheduledExecutorService = null;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/85fdc412/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java
index 4c95eb9..a97f3fa 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java
@@ -82,7 +82,7 @@ public abstract class AbstractLogicalInput implements LogicalInput, LogicalInput
     return inputContext;
   }
 
-  public float getProgress() throws IOException, InterruptedException {
+  public float getProgress() throws ProgressFailedException, InterruptedException {
     return 0.0f;
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/85fdc412/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
index 3195a17..e3c3624 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
@@ -94,7 +94,7 @@ public abstract class MergedLogicalInput implements LogicalInput {
    */
   public abstract void setConstituentInputIsReady(Input input);
 
-  public float getProgress() throws IOException, InterruptedException {
+  public float getProgress() throws ProgressFailedException, InterruptedException {
     return 0.0f;
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/85fdc412/tez-api/src/main/java/org/apache/tez/runtime/api/ProgressFailedException.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/ProgressFailedException.java b/tez-api/src/main/java/org/apache/tez/runtime/api/ProgressFailedException.java
new file mode 100644
index 0000000..07995cc
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/ProgressFailedException.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.tez.dag.api.TezException;
+
+@Public
+@Evolving
+/**
+ * Exception invoked when getProgress fails
+ */
+public class ProgressFailedException extends TezException {
+
+  private static final long serialVersionUID = -114180015419275775L;
+
+  public ProgressFailedException() {
+    super("Progress update failed");
+  }
+
+  public ProgressFailedException(Throwable cause) {
+    super("Progress update failed", cause);
+  }
+
+  public ProgressFailedException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/85fdc412/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index b83d1a3..1b0ffed 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -28,6 +28,7 @@ import java.util.concurrent.locks.ReentrantLock;
 
 import com.google.protobuf.ByteString;
 
+import org.apache.tez.runtime.api.ProgressFailedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -612,8 +613,12 @@ public class MRInput extends MRInputBase {
   }
 
   @Override
-  public float getProgress() throws IOException,InterruptedException {
-    return (mrReader != null) ? mrReader.getProgress() : 0.0f;
+  public float getProgress() throws ProgressFailedException, InterruptedException {
+    try {
+      return (mrReader != null) ? mrReader.getProgress() : 0.0f;
+    } catch (IOException e) {
+      throw new ProgressFailedException("getProgress encountered IOException ", e);
+    }
   }
 
   void processSplitEvent(InputDataInformationEvent event)

http://git-wip-us.apache.org/repos/asf/tez/blob/85fdc412/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
index 9b5ed1c..70be7ee 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.tez.runtime.api.ProgressFailedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
@@ -87,7 +88,7 @@ public class MRInputLegacy extends MRInput {
     return (org.apache.hadoop.mapreduce.RecordReader) mrReader.getRecordReader();
   }
 
-  public float getProgress() throws IOException, InterruptedException {
+  public float getProgress() throws ProgressFailedException, InterruptedException {
       return super.getProgress();
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/85fdc412/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
index ecdacf9..4c0616e 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
@@ -21,9 +21,9 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Timer;
-import java.util.TimerTask;
 
+import org.apache.tez.common.ProgressHelper;
+import org.apache.tez.runtime.api.ProgressFailedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -46,7 +46,6 @@ import org.apache.tez.mapreduce.input.MRInputLegacy;
 import org.apache.tez.mapreduce.output.MROutputLegacy;
 import org.apache.tez.mapreduce.processor.MRTask;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
-import org.apache.tez.runtime.api.AbstractLogicalInput;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
@@ -63,32 +62,7 @@ public class MapProcessor extends MRTask{
 
   protected Map<String, LogicalInput> inputs;
   protected Map<String, LogicalOutput> outputs;
-  Timer progressTimer = new Timer();
-  TimerTask progressTask = new TimerTask() {
-
-    @Override
-    public void run() {
-      try {
-        float progSum = 0.0f;
-        if (inputs != null && inputs.size() != 0) {
-          for(LogicalInput input : inputs.values()) {
-            if (input instanceof AbstractLogicalInput) {
-              progSum += ((AbstractLogicalInput) input).getProgress();
-            }
-          }
-          float progress = (1.0f) * progSum / inputs.size();
-          getContext().setProgress(progress);
-          mrReporter.setProgress(progress);
-        }
-      } catch (IOException e) {
-        LOG.warn("Encountered IOException during Processor progress update"
-            + e.getMessage());
-      } catch (InterruptedException e) {
-        LOG.warn("Encountered InterruptedException during Processor progress"
-            + "update" + e.getMessage());
-      }
-    }
-  };
+  private ProgressHelper progressHelper;
 
   public MapProcessor(ProcessorContext processorContext) {
     super(processorContext, true);
@@ -101,8 +75,9 @@ public class MapProcessor extends MRTask{
   }
 
   public void close() throws IOException {
-    progressTimer.cancel();
-
+    if (progressHelper != null) {
+      progressHelper.shutDownProgressTaskService();
+    }
   }
 
   @Override
@@ -110,6 +85,7 @@ public class MapProcessor extends MRTask{
       Map<String, LogicalOutput> _outputs) throws Exception {
     this.inputs = _inputs;
     this.outputs = _outputs;
+    progressHelper = new ProgressHelper(this.inputs, getContext(), this.getClass().getSimpleName());
     LOG.info("Running map: " + processorContext.getUniqueIdentifier());
 
     if (_inputs.size() != 1
@@ -130,7 +106,7 @@ public class MapProcessor extends MRTask{
     LogicalOutput out = _outputs.values().iterator().next();
 
     initTask(out);
-    progressTimer.schedule(progressTask, 0, 100);
+    progressHelper.scheduleProgressTaskService(0, 100);
 
     // Sanity check
     if (!(in instanceof MRInputLegacy)) {
@@ -316,7 +292,17 @@ public class MapProcessor extends MRTask{
 
     @Override
     public float getProgress() throws IOException, InterruptedException {
-      return in.getProgress();
+      try {
+        return in.getProgress();
+      } catch (ProgressFailedException e) {
+        if (e.getCause() instanceof IOException) {
+          throw (IOException)e.getCause();
+        }
+        if (e.getCause() instanceof InterruptedException) {
+          throw (InterruptedException)e.getCause();
+        }
+      }
+      throw new RuntimeException("Could not get Processor progress");
     }
 
     @Override
@@ -367,6 +353,8 @@ public class MapProcessor extends MRTask{
     public float getProgress() throws IOException {
       try {
         return mrInput.getProgress();
+      } catch (ProgressFailedException pe) {
+        throw new IOException(pe);
       } catch (InterruptedException ie) {
         throw new IOException(ie);
       }

http://git-wip-us.apache.org/repos/asf/tez/blob/85fdc412/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
index 8ec6091..4b79c78 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
@@ -22,9 +22,8 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
 
+import org.apache.tez.common.ProgressHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -44,7 +43,6 @@ import org.apache.tez.dag.api.TezException;
 import org.apache.tez.mapreduce.output.MROutputLegacy;
 import org.apache.tez.mapreduce.processor.MRTask;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
-import org.apache.tez.runtime.api.AbstractLogicalInput;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.LogicalInput;
@@ -68,32 +66,7 @@ public class ReduceProcessor extends MRTask {
 
   protected Map<String, LogicalInput> inputs;
   protected Map<String, LogicalOutput> outputs;
-  Timer progressTimer = new Timer();
-  TimerTask progressTask = new TimerTask() {
-
-    @Override
-    public void run() {
-      try {
-        float progSum = 0.0f;
-        if (inputs != null && inputs.size() != 0) {
-          for(LogicalInput input : inputs.values()) {
-            if (input instanceof AbstractLogicalInput) {
-              progSum += ((AbstractLogicalInput) input).getProgress();
-            }
-          }
-          float progress = (1.0f) * progSum / inputs.size();
-          getContext().setProgress(progress);
-          mrReporter.setProgress(progress);
-        }
-      } catch (IOException e) {
-        LOG.warn("Encountered IOException during Processor progress update"
-            + e.getMessage());
-      } catch (InterruptedException e) {
-        LOG.warn("Encountered InterruptedException during Processor progress"
-            + "update" + e.getMessage());
-      }
-    }
-  };
+  private ProgressHelper progressHelper;
 
   public ReduceProcessor(ProcessorContext processorContext) {
     super(processorContext, false);
@@ -106,7 +79,9 @@ public class ReduceProcessor extends MRTask {
   }
 
   public void close() throws IOException {
-    progressTimer.cancel();
+    if (progressHelper != null) {
+      progressHelper.shutDownProgressTaskService();
+    }
 
   }
 
@@ -115,6 +90,7 @@ public class ReduceProcessor extends MRTask {
       Map<String, LogicalOutput> _outputs) throws Exception {
     this.inputs = _inputs;
     this.outputs = _outputs;
+    progressHelper = new ProgressHelper(this.inputs, processorContext, this.getClass().getSimpleName());
     LOG.info("Running reduce: " + processorContext.getUniqueIdentifier());
 
     if (_outputs.size() <= 0 || _outputs.size() > 1) {
@@ -139,7 +115,7 @@ public class ReduceProcessor extends MRTask {
     out.start();
 
     initTask(out);
-    progressTimer.schedule(progressTask, 0, 100);
+    progressHelper.scheduleProgressTaskService(0, 100);
     this.statusUpdate();
 
     Class keyClass = ConfigUtils.getIntermediateInputKeyClass(jobConf);

http://git-wip-us.apache.org/repos/asf/tez/blob/85fdc412/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
index 8309966..b69dc0c 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
@@ -118,7 +118,7 @@ public class MapUtils {
   }
   
   private static InputSplit 
-  createInputSplit(FileSystem fs, Path workDir, JobConf job, Path file) 
+  createInputSplit(FileSystem fs, Path workDir, JobConf job, Path file, int numKVs)
       throws IOException {
     FileInputFormat.setInputPaths(job, workDir);
 
@@ -132,7 +132,7 @@ public class MapUtils {
       Random r = new Random(System.currentTimeMillis());
       LongWritable key = new LongWritable();
       Text value = new Text();
-      for (int i = 10; i > 0; i--) {
+      for (int i = numKVs; i > 0; i--) {
         key.set(r.nextInt(1000));
         value.set(Integer.toString(i));
         writer.append(key, value);
@@ -189,9 +189,10 @@ public class MapUtils {
     outMeta.close();
   }
 
-  public static void generateInputSplit(FileSystem fs, Path workDir, JobConf jobConf, Path mapInput) throws IOException {
+  public static void generateInputSplit(FileSystem fs, Path workDir, JobConf jobConf, Path mapInput,
+                                        int numKVs) throws IOException {
     jobConf.setInputFormat(SequenceFileInputFormat.class);
-    InputSplit split = createInputSplit(fs, workDir, jobConf, mapInput);
+    InputSplit split = createInputSplit(fs, workDir, jobConf, mapInput, numKVs);
     writeSplitFiles(fs, jobConf, split);
   }
   

http://git-wip-us.apache.org/repos/asf/tez/blob/85fdc412/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
index 53b8c46..3243de5 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.db.FloatSplitter;
 import org.apache.hadoop.mapreduce.split.JobSplit;
 import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez;
 import org.junit.Assert;
@@ -149,7 +150,7 @@ public class TestMapProcessor {
     Path mapInput = new Path(workDir, "map0");
     
     
-    MapUtils.generateInputSplit(localFs, workDir, jobConf, mapInput);
+    MapUtils.generateInputSplit(localFs, workDir, jobConf, mapInput, 10);
 
     InputSpec mapInputSpec = new InputSpec("NullSrcVertex",
         InputDescriptor.create(MRInputLegacy.class.getName())
@@ -221,7 +222,7 @@ public class TestMapProcessor {
     Path mapInput = new Path(workDir, "map0");
 
 
-    generateInputSplit(localFs, workDir, jobConf, mapInput);
+    MapUtils.generateInputSplit(localFs, workDir, jobConf, mapInput, 100000);
 
     InputSpec mapInputSpec = new InputSpec("NullSrcVertex",
         InputDescriptor.create(MRInputLegacy.class.getName())
@@ -246,84 +247,17 @@ public class TestMapProcessor {
       @Override
       public void run() {
         float prog = task.getProgress();
-        if(prog > 0.0 && prog < 1.0)
+        if(prog > 0.0f && prog < 1.0f)
           progressUpdate = prog;
       }
     });
 
     task.initialize();
-    scheduler.scheduleAtFixedRate(monitorProgress, 0, 10,
+    scheduler.scheduleAtFixedRate(monitorProgress, 0, 1,
         TimeUnit.MILLISECONDS);
     task.run();
     Assert.assertTrue("Progress Updates should be captured!",
-        progressUpdate != 0.0f);
+        progressUpdate > 0.0f && progressUpdate < 1.0f);
     task.close();
   }
-
-  public static void generateInputSplit(FileSystem fs, Path workDir,
-                                        JobConf jobConf, Path mapInput)
-      throws IOException {
-    jobConf.setInputFormat(SequenceFileInputFormat.class);
-    FileInputFormat.setInputPaths(jobConf, workDir);
-
-    LOG.info("Generating data at path: " + mapInput);
-    // create a file with length entries
-    SequenceFile.Writer writer =
-        SequenceFile.createWriter(fs, jobConf, mapInput,
-            LongWritable.class, Text.class);
-    try {
-      Random r = new Random(System.currentTimeMillis());
-      LongWritable key = new LongWritable();
-      Text value = new Text();
-      for (int i = 100000; i > 0; i--) {
-        key.set(r.nextInt(1000));
-        value.set(Integer.toString(i));
-        writer.append(key, value);
-        LOG.info("<k, v> : <" + key.get() + ", " + value + ">");
-      }
-    } finally {
-      writer.close();
-    }
-
-    SequenceFileInputFormat<LongWritable, Text> format =
-        new SequenceFileInputFormat<LongWritable, Text>();
-    InputSplit[] splits = format.getSplits(jobConf, 1);
-    System.err.println("#split = " + splits.length + " ; " +
-        "#locs = " + splits[0].getLocations().length + "; " +
-        "loc = " + splits[0].getLocations()[0] + "; " +
-        "off = " + splits[0].getLength() + "; " +
-        "file = " + ((FileSplit)splits[0]).getPath());
-    writeSplitFiles(fs, jobConf, splits[0]);
-  }
-
-  private static void writeSplitFiles(FileSystem fs, JobConf conf,
-                                      InputSplit split) throws IOException {
-    Path jobSplitFile = new Path(conf.get(MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR,
-        MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR_DEFAULT), MRJobConfig.JOB_SPLIT);
-    LOG.info("Writing split to: " + jobSplitFile);
-    FSDataOutputStream out = FileSystem.create(fs, jobSplitFile,
-        new FsPermission(JOB_FILE_PERMISSION));
-
-    long offset = out.getPos();
-    Text.writeString(out, split.getClass().getName());
-    split.write(out);
-    out.close();
-
-    String[] locations = split.getLocations();
-
-    JobSplit.SplitMetaInfo info = null;
-    info = new JobSplit.SplitMetaInfo(locations, offset, split.getLength());
-
-    Path jobSplitMetaInfoFile = new Path(
-        conf.get(MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR),
-        MRJobConfig.JOB_SPLIT_METAINFO);
-
-    FSDataOutputStream outMeta = FileSystem.create(fs, jobSplitMetaInfoFile,
-        new FsPermission(JOB_FILE_PERMISSION));
-    outMeta.write(SplitMetaInfoReaderTez.META_SPLIT_FILE_HEADER);
-    WritableUtils.writeVInt(outMeta, SplitMetaInfoReaderTez.META_SPLIT_VERSION);
-    WritableUtils.writeVInt(outMeta, 1); // Only 1 split meta info being written
-    info.write(outMeta);
-    outMeta.close();
-  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/85fdc412/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index 1922c53..ca3792f 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -139,7 +139,7 @@ public class TestReduceProcessor {
     jobConf.setBoolean(MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS, false);
     
     Path mapInput = new Path(workDir, "map0");
-    MapUtils.generateInputSplit(localFs, workDir, jobConf, mapInput);
+    MapUtils.generateInputSplit(localFs, workDir, jobConf, mapInput, 10);
 
     InputSpec mapInputSpec = new InputSpec("NullSrcVertex",
         InputDescriptor.create(MRInputLegacy.class.getName())

http://git-wip-us.apache.org/repos/asf/tez/blob/85fdc412/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
index c00b977..d03f48e 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
@@ -92,7 +92,7 @@ public class TezProcessorContextImpl extends TezTaskContextImpl implements Proce
 
   @Override
   public void setProgress(float progress) {
-    if (runtimeTask.getProgress() != progress) {
+    if (Math.abs(progress - runtimeTask.getProgress()) >= 0.001f) {
       runtimeTask.setProgress(progress);
       notifyProgress();
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/85fdc412/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
index 3f44c4f..f4400db 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
@@ -19,6 +19,7 @@
 package org.apache.tez.runtime.library.common.readers;
 
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.library.api.IOInterruptedException;
@@ -73,8 +74,8 @@ public class UnorderedKVReader<K, V> extends KeyValueReader {
   // TODO Remove this once per I/O counters are separated properly. Relying on
   // the counter at the moment will generate aggregate numbers. 
   private int numRecordsRead = 0;
-  private long totalBytesRead = 0;
-  private long totalFileBytes = 0;
+  private final AtomicLong totalBytesRead = new AtomicLong(0);
+  private final AtomicLong totalFileBytes = new AtomicLong(0);
 
 
   public UnorderedKVReader(ShuffleManager shuffleManager, Configuration conf,
@@ -148,16 +149,15 @@ public class UnorderedKVReader<K, V> extends KeyValueReader {
     return value;
   }
 
-  public float getProgress() {
-    int numInputs = shuffleManager.getNumInputs();
-    if (totalFileBytes > 0 && numInputs > 0) {
-      return ((1.0f) * (totalBytesRead + ((currentReader != null) ? currentReader.bytesRead :
-      0l)) /
-          totalFileBytes) * (
-          shuffleManager.getNumCompletedInputs().floatValue() /
-              (1.0f * numInputs));
+  public float getProgress() throws IOException, InterruptedException {
+    final int numInputs = shuffleManager.getNumInputs();
+    if (totalFileBytes.get() > 0 && numInputs > 0) {
+      return ((1.0f) * (totalBytesRead.get() + ((currentReader != null) ? currentReader.bytesRead :
+      0.0f)) /
+          totalFileBytes.get()) * (shuffleManager.getNumCompletedInputsFloat() /
+          (1.0f * numInputs));
     }
-    return 0l;
+    return 0.0f;
   }
   /**
    * Tries reading the next key and value from the current reader.
@@ -189,7 +189,7 @@ public class UnorderedKVReader<K, V> extends KeyValueReader {
    */
   private boolean moveToNextInput() throws IOException {
     if (currentReader != null) { // Close the current reader.
-      totalBytesRead += currentReader.bytesRead;
+      totalBytesRead.getAndAdd(currentReader.bytesRead);
       currentReader.close();
       /**
        * clear reader explicitly. Otherwise this could point to stale reference when next() is
@@ -210,7 +210,7 @@ public class UnorderedKVReader<K, V> extends KeyValueReader {
       return false; // No more inputs
     } else {
       currentReader = openIFileReader(currentFetchedInput);
-      totalFileBytes += currentReader.getLength();
+      totalFileBytes.getAndAdd(currentReader.getLength());
       return true;
     }
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/85fdc412/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index 4702e23..0cb17e6 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -867,8 +867,8 @@ public class ShuffleManager implements FetcherCallback {
     return numInputs;
   }
 
-  public AtomicInteger getNumCompletedInputs() {
-    return numCompletedInputs;
+  public float getNumCompletedInputsFloat() {
+    return numCompletedInputs.floatValue();
   }
 
   /////////////////// End of methods for walking the available inputs

http://git-wip-us.apache.org/repos/asf/tez/blob/85fdc412/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
index 743b628..a0059cf 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
@@ -26,6 +26,7 @@ import org.apache.tez.dag.api.GroupInputEdge;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.MergedLogicalInput;
+import org.apache.tez.runtime.api.ProgressFailedException;
 import org.apache.tez.runtime.api.Reader;
 import org.apache.tez.runtime.api.MergedInputContext;
 import org.apache.tez.runtime.library.api.KeyValueReader;
@@ -89,7 +90,7 @@ public class ConcatenatedMergedKeyValueInput extends MergedLogicalInput {
       return currentReader.getCurrentValue();
     }
 
-    public float getProgress() {
+    public float getProgress() throws IOException, InterruptedException {
       return (1.0f)*(currentReaderIndex + 1)/getInputs().size();
     }
   }
@@ -110,7 +111,11 @@ public class ConcatenatedMergedKeyValueInput extends MergedLogicalInput {
   }
 
   @Override
-  public float getProgress()  throws IOException, InterruptedException {
-    return concatenatedMergedKeyValueReader.getProgress();
+  public float getProgress() throws ProgressFailedException, InterruptedException {
+    try {
+      return concatenatedMergedKeyValueReader.getProgress();
+    } catch (IOException e) {
+      throw new ProgressFailedException("getProgress encountered IOException ", e);
+    }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/85fdc412/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
index fa51f47..2555a57 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
@@ -26,6 +26,7 @@ import org.apache.tez.dag.api.GroupInputEdge;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.MergedLogicalInput;
+import org.apache.tez.runtime.api.ProgressFailedException;
 import org.apache.tez.runtime.api.Reader;
 import org.apache.tez.runtime.api.MergedInputContext;
 import org.apache.tez.runtime.library.api.KeyValuesReader;
@@ -91,7 +92,7 @@ public class ConcatenatedMergedKeyValuesInput extends MergedLogicalInput {
       return currentReader.getCurrentValues();
     }
 
-    public float getProgress() {
+    public float getProgress() throws IOException, InterruptedException {
       return (1.0f)*(currentReaderIndex + 1)/getInputs().size();
     }
   }
@@ -112,7 +113,11 @@ public class ConcatenatedMergedKeyValuesInput extends MergedLogicalInput {
   }
 
   @Override
-  public float getProgress()  throws IOException, InterruptedException {
-    return concatenatedMergedKeyValuesReader.getProgress();
+  public float getProgress() throws ProgressFailedException, InterruptedException {
+    try {
+      return concatenatedMergedKeyValuesReader.getProgress();
+    } catch (IOException e) {
+      throw new ProgressFailedException("getProgress encountered IOException ", e);
+    }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/85fdc412/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
index c86e2fb..8e653ed 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
@@ -28,6 +28,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.tez.runtime.api.ProgressFailedException;
 import org.apache.tez.runtime.library.api.IOInterruptedException;
 import org.apache.tez.runtime.library.common.Constants;
 import org.slf4j.Logger;
@@ -267,7 +268,7 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput {
   }
 
   @Override
-  public float getProgress()  throws IOException, InterruptedException {
+  public float getProgress() throws ProgressFailedException, InterruptedException {
     int totalInputs = getNumPhysicalInputs();
     if (totalInputs != 0) {
       synchronized (this) {

http://git-wip-us.apache.org/repos/asf/tez/blob/85fdc412/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java
index 5d6668d..49d4043 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.PriorityQueue;
 import java.util.Set;
 
+import org.apache.tez.runtime.api.ProgressFailedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
@@ -250,7 +251,7 @@ public class OrderedGroupedMergedKVInput extends MergedLogicalInput {
       }
     }
   }
-  public float getProgress() throws IOException, InterruptedException {
+  public float getProgress() throws ProgressFailedException, InterruptedException {
     float totalProgress = 0.0f;
     for(Input input : getInputs()) {
       totalProgress += ((OrderedGroupedKVInput)input).getProgress();

http://git-wip-us.apache.org/repos/asf/tez/blob/85fdc412/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
index f83b9aa..62d7b99 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
@@ -25,6 +25,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.runtime.api.ProgressFailedException;
 import org.apache.tez.runtime.library.common.Constants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -292,7 +293,11 @@ public class UnorderedKVInput extends AbstractLogicalInput {
   }
 
   @Override
-  public float getProgress() {
-    return kvReader.getProgress();
+  public float getProgress() throws ProgressFailedException, InterruptedException {
+    try {
+      return kvReader.getProgress();
+    } catch (IOException e) {
+      throw new ProgressFailedException("getProgress encountered IOException ", e);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/85fdc412/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java
index 66c3625..c237bc1 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java
@@ -17,15 +17,12 @@
  */
 package org.apache.tez.runtime.library.processor;
 
-import java.io.IOException;
 import java.util.List;
 import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
 
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
-import org.apache.tez.runtime.api.AbstractLogicalInput;
+import org.apache.tez.common.ProgressHelper;
 import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.LogicalInput;
@@ -49,31 +46,7 @@ public abstract class SimpleProcessor extends AbstractLogicalIOProcessor {
   protected Map<String, LogicalInput> inputs;
   protected Map<String, LogicalOutput> outputs;
 
-  Timer progressTimer = new Timer();
-  TimerTask progressTask = new TimerTask() {
-
-    @Override
-    public void run() {
-      try {
-        float progSum = 0.0f;
-        if (getInputs() != null) {
-          for(LogicalInput input : getInputs().values()) {
-            if (input instanceof AbstractLogicalInput) {
-              progSum += ((AbstractLogicalInput) input).getProgress();
-            }
-          }
-          float progress = (1.0f) * progSum / inputs.size();
-          getContext().setProgress(progress);
-        }
-      } catch (IOException e) {
-        LOG.warn("Encountered IOException during Processor progress update"
-            + e.getMessage());
-      } catch (InterruptedException e) {
-        LOG.warn("Encountered InterruptedException during Processor progress"
-            + "update" + e.getMessage());
-      }
-    }
-  };
+  protected ProgressHelper progressHelper;
 
   public SimpleProcessor(ProcessorContext context) {
     super(context);
@@ -83,6 +56,7 @@ public abstract class SimpleProcessor extends AbstractLogicalIOProcessor {
       throws Exception {
     this.inputs = _inputs;
     this.outputs = _outputs;
+    progressHelper = new ProgressHelper(this.inputs, getContext(),this.getClass().getSimpleName());
     preOp();
     run();
     postOp();
@@ -106,7 +80,7 @@ public abstract class SimpleProcessor extends AbstractLogicalIOProcessor {
       for (LogicalInput input : getInputs().values()) {
         input.start();
       }
-      progressTimer.schedule(progressTask, 0, 100);
+      progressHelper.scheduleProgressTaskService(0, 100);
     }
     if (getOutputs() != null) {
       for (LogicalOutput output : getOutputs().values()) {
@@ -136,7 +110,9 @@ public abstract class SimpleProcessor extends AbstractLogicalIOProcessor {
 
   @Override
   public void close() throws Exception {
-    progressTimer.cancel();
+    if( progressHelper != null) {
+      progressHelper.shutDownProgressTaskService();
+    }
   }
 
   public Map<String, LogicalInput> getInputs() {
@@ -146,8 +122,4 @@ public abstract class SimpleProcessor extends AbstractLogicalIOProcessor {
   public Map<String, LogicalOutput> getOutputs() {
     return outputs;
   }
-
-  public Timer getProgressTimer() {
-    return progressTimer;
-  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/85fdc412/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
index 92bbce8..3efcd21 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
@@ -18,21 +18,18 @@
 
 package org.apache.tez.runtime.library.processor;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
 import java.nio.charset.Charset;
 import java.util.List;
 import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
 
 import com.google.common.base.Charsets;
+import org.apache.tez.common.ProgressHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.runtime.api.AbstractLogicalInput;
 import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.LogicalInput;
@@ -53,32 +50,7 @@ public class SleepProcessor extends AbstractLogicalIOProcessor {
   private int timeToSleepMS;
   protected Map<String, LogicalInput> inputs;
   protected Map<String, LogicalOutput> outputs;
-
-  Timer progressTimer = new Timer();
-  TimerTask progressTask = new TimerTask() {
-
-    @Override
-    public void run() {
-      try {
-        float progSum = 0.0f;
-        if (inputs != null) {
-          for(LogicalInput input : inputs.values()) {
-            if (input instanceof AbstractLogicalInput) {
-              progSum += ((AbstractLogicalInput) input).getProgress();
-            }
-          }
-          float progress = (1.0f) * progSum / inputs.size();
-          getContext().setProgress(progress);
-        }
-      } catch (IOException e) {
-        LOG.warn("Encountered IOException during Processor progress update" +
-            e.getMessage());
-      } catch (InterruptedException e) {
-        LOG.warn("Encountered InterruptedException during Processor progress" +
-            "update" + e.getMessage());
-      }
-    }
-  };
+  private ProgressHelper progressHelper;
 
   public SleepProcessor(ProcessorContext context) {
     super(context);
@@ -105,12 +77,13 @@ public class SleepProcessor extends AbstractLogicalIOProcessor {
                   Map<String, LogicalOutput> _outputs) throws Exception {
     inputs = _inputs;
     outputs = _outputs;
+    progressHelper = new ProgressHelper(this.inputs, getContext(),this.getClass().getSimpleName());
     LOG.info("Running the Sleep Processor, sleeping for "
       + timeToSleepMS + " ms");
     for (LogicalInput input : _inputs.values()) {
       input.start();
     }
-    progressTimer.schedule(progressTask, 0, 100);
+    progressHelper.scheduleProgressTaskService(0, 100);
     for (LogicalOutput output : _outputs.values()) {
       output.start();
     }
@@ -128,7 +101,9 @@ public class SleepProcessor extends AbstractLogicalIOProcessor {
 
   @Override
   public void close() throws Exception {
-    progressTimer.cancel();
+    if (progressHelper != null) {
+      progressHelper.shutDownProgressTaskService();
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/tez/blob/85fdc412/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordInputProcessor.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordInputProcessor.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordInputProcessor.java
index 513c782..15d6e82 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordInputProcessor.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordInputProcessor.java
@@ -18,12 +18,10 @@
 
 package org.apache.tez.mapreduce.examples.processor;
 
-import java.io.IOException;
 import java.util.List;
 import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
 
+import org.apache.tez.common.ProgressHelper;
 import org.apache.tez.runtime.api.TaskFailureType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,7 +34,6 @@ import org.apache.tez.mapreduce.examples.FilterLinesByWord.TextLongPair;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
-import org.apache.tez.runtime.api.AbstractLogicalInput;
 import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.LogicalInput;
@@ -53,31 +50,7 @@ public class FilterByWordInputProcessor extends AbstractLogicalIOProcessor {
   private String filterWord;
   protected Map<String, LogicalInput> inputs;
   protected Map<String, LogicalOutput> outputs;
-  Timer progressTimer = new Timer();
-  TimerTask progressTask = new TimerTask() {
-
-    @Override
-    public void run() {
-      try {
-        float progSum = 0.0f;
-        if (inputs != null) {
-          for(LogicalInput input : inputs.values()) {
-            if (input instanceof AbstractLogicalInput) {
-              progSum += ((AbstractLogicalInput) input).getProgress();
-            }
-          }
-          float progress = (1.0f) * progSum / inputs.size();
-          getContext().setProgress(progress);
-        }
-      } catch (IOException e) {
-        LOG.warn("Encountered IOException during Processor progress update" +
-            e.getMessage());
-      } catch (InterruptedException e) {
-        LOG.warn("Encountered InterruptedException during Processor progress" +
-            "update" + e.getMessage());
-      }
-    }
-  };
+  private ProgressHelper progressHelper;
 
   public FilterByWordInputProcessor(ProcessorContext context) {
     super(context);
@@ -101,7 +74,9 @@ public class FilterByWordInputProcessor extends AbstractLogicalIOProcessor {
 
   @Override
   public void close() throws Exception {
-    progressTimer.cancel();
+    if (progressHelper != null) {
+      progressHelper.shutDownProgressTaskService();
+    }
   }
 
   @Override
@@ -109,7 +84,7 @@ public class FilterByWordInputProcessor extends AbstractLogicalIOProcessor {
       Map<String, LogicalOutput> _outputs) throws Exception {
     this.inputs = _inputs;
     this.outputs = _outputs;
-    
+    this.progressHelper = new ProgressHelper(this.inputs, getContext(),this.getClass().getSimpleName());
     if (_inputs.size() != 1) {
       throw new IllegalStateException("FilterByWordInputProcessor processor can only work with a single input");
     }
@@ -134,7 +109,7 @@ public class FilterByWordInputProcessor extends AbstractLogicalIOProcessor {
     if (! (lo instanceof UnorderedKVOutput)) {
       throw new IllegalStateException("FilterByWordInputProcessor processor can only work with OnFileUnorderedKVOutput");
     }
-    progressTimer.schedule(progressTask, 0, 100);
+    progressHelper.scheduleProgressTaskService(0, 100);
     MRInputLegacy mrInput = (MRInputLegacy) li;
     mrInput.init();
     UnorderedKVOutput kvOutput = (UnorderedKVOutput) lo;

http://git-wip-us.apache.org/repos/asf/tez/blob/85fdc412/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordOutputProcessor.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordOutputProcessor.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordOutputProcessor.java
index 5872527..7acaf7e 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordOutputProcessor.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordOutputProcessor.java
@@ -20,6 +20,7 @@ package org.apache.tez.mapreduce.examples.processor;
 
 import java.util.List;
 
+import org.apache.tez.common.ProgressHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.tez.mapreduce.output.MROutput;
@@ -51,7 +52,7 @@ public class FilterByWordOutputProcessor extends SimpleMRProcessor {
   @Override
   public void close() throws Exception {
     LOG.info("Broadcast Output Processor closing. Nothing to do");
-    getProgressTimer().cancel();
+    super.close();
   }
 
   @Override