You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/11/13 14:54:30 UTC

[1/2] tez git commit: TEZ-2918. Make progress notifications in IOs (bikas)

Repository: tez
Updated Branches:
  refs/heads/master 344a8cc14 -> 5ec498d8f


http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestSortedGroupedMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestSortedGroupedMergedInput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestSortedGroupedMergedInput.java
index 0de400e..c8ae67b 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestSortedGroupedMergedInput.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestSortedGroupedMergedInput.java
@@ -22,21 +22,19 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.hadoop.io.RawComparator;
-import org.apache.tez.runtime.InputReadyTracker;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.Input;
-import org.apache.tez.runtime.api.MergedLogicalInput;
 import org.apache.tez.runtime.api.MergedInputContext;
 import org.apache.tez.runtime.api.Reader;
-import org.apache.tez.runtime.api.impl.TezMergedInputContextImpl;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 import org.apache.tez.runtime.library.api.KeyValuesReader;
 import org.junit.Test;
@@ -44,8 +42,7 @@ import org.junit.Test;
 public class TestSortedGroupedMergedInput {
 
   MergedInputContext createMergedInputContext() {
-    return new TezMergedInputContextImpl(null, "mergedInputName", new HashMap<String, MergedLogicalInput>(),
-        mock(InputReadyTracker.class), null);
+    return mock(MergedInputContext.class);
   }
   
   @Test(timeout = 5000)
@@ -67,7 +64,8 @@ public class TestSortedGroupedMergedInput {
     sInputs.add(sInput1);
     sInputs.add(sInput2);
     sInputs.add(sInput3);
-    OrderedGroupedMergedKVInput input = new OrderedGroupedMergedKVInput(createMergedInputContext(), sInputs);
+    MergedInputContext mockContext = createMergedInputContext();
+    OrderedGroupedMergedKVInput input = new OrderedGroupedMergedKVInput(mockContext, sInputs);
 
     KeyValuesReader kvsReader = input.getReader();
     int keyCount = 0;
@@ -84,6 +82,7 @@ public class TestSortedGroupedMergedInput {
       }
       assertEquals(6, valCount);
     }
+    verify(mockContext, times(4)).notifyProgress(); // one for each reader change and one to exit
 
     getNextFromFinishedReader(kvsReader);
   }
@@ -120,7 +119,8 @@ public class TestSortedGroupedMergedInput {
     sInputs.add(sInput2);
     sInputs.add(sInput3);
 
-    OrderedGroupedMergedKVInput input = new OrderedGroupedMergedKVInput(createMergedInputContext(), sInputs);
+    OrderedGroupedMergedKVInput input = new OrderedGroupedMergedKVInput(
+        createMergedInputContext(), sInputs);
 
     KeyValuesReader kvsReader = input.getReader();
     int keyCount = 0;
@@ -384,8 +384,8 @@ public class TestSortedGroupedMergedInput {
     sInputs.add(sInput1);
     sInputs.add(sInput2);
     sInputs.add(sInput3);
-    ConcatenatedMergedKeyValueInput input =
-        new ConcatenatedMergedKeyValueInput(createMergedInputContext(), sInputs);
+    MergedInputContext mockContext = createMergedInputContext();
+    ConcatenatedMergedKeyValueInput input = new ConcatenatedMergedKeyValueInput(mockContext, sInputs);
 
     KeyValueReader kvReader = input.getReader();
     int keyCount = 0;
@@ -395,6 +395,7 @@ public class TestSortedGroupedMergedInput {
       Integer value = (Integer) kvReader.getCurrentValue();
     }
     assertTrue(keyCount == 30);
+    verify(mockContext, times(4)).notifyProgress(); // one for each reader change and one to exit
 
     getNextFromFinishedReader(kvReader);
   }
@@ -418,8 +419,8 @@ public class TestSortedGroupedMergedInput {
     sInputs.add(sInput1);
     sInputs.add(sInput2);
     sInputs.add(sInput3);
-    ConcatenatedMergedKeyValuesInput input =
-        new ConcatenatedMergedKeyValuesInput(createMergedInputContext(), sInputs);
+    MergedInputContext mockContext = createMergedInputContext();
+    ConcatenatedMergedKeyValuesInput input = new ConcatenatedMergedKeyValuesInput(mockContext, sInputs);
 
     KeyValuesReader kvsReader = input.getReader();
     int keyCount = 0;
@@ -435,6 +436,7 @@ public class TestSortedGroupedMergedInput {
       assertEquals(2, valCount);
     }
     assertEquals(9, keyCount);
+    verify(mockContext, times(4)).notifyProgress(); // one for each reader change and one to exit
 
     getNextFromFinishedReader(kvsReader);
   }


[2/2] tez git commit: TEZ-2918. Make progress notifications in IOs (bikas)

Posted by bi...@apache.org.
TEZ-2918. Make progress notifications in IOs (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/5ec498d8
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/5ec498d8
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/5ec498d8

Branch: refs/heads/master
Commit: 5ec498d8f5b6cecf4eaee8f995ea2cf9ca2acfcc
Parents: 344a8cc
Author: Bikas Saha <bi...@apache.org>
Authored: Fri Nov 13 05:54:09 2015 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Fri Nov 13 05:54:09 2015 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../tez/runtime/api/MergedInputContext.java     |   5 +
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |  14 +-
 .../org/apache/tez/mapreduce/input/MRInput.java |  10 +-
 .../tez/mapreduce/input/MultiMRInput.java       |   4 +-
 .../org/apache/tez/mapreduce/lib/MRReader.java  |  16 ++
 .../tez/mapreduce/lib/MRReaderMapReduce.java    |  10 +-
 .../tez/mapreduce/lib/MRReaderMapred.java       |  11 +-
 .../apache/tez/mapreduce/output/MROutput.java   |   1 +
 .../apache/tez/mapreduce/input/TestMRInput.java |   3 +
 .../tez/mapreduce/input/TestMultiMRInput.java   |   4 +
 .../tez/mapreduce/lib/TestKVReadersWithMR.java  |  12 +-
 .../tez/mapreduce/output/TestMROutput.java      | 161 +++++++++++++++++++
 .../runtime/LogicalIOProcessorRuntimeTask.java  |   2 +-
 .../org/apache/tez/runtime/RuntimeTask.java     |   9 +-
 .../api/impl/TezMergedInputContextImpl.java     |  11 +-
 .../runtime/api/impl/TezTaskContextImpl.java    |   2 +-
 .../tez/runtime/TestInputReadyTracker.java      |   6 +-
 .../common/readers/UnorderedKVReader.java       |   8 +-
 .../library/common/shuffle/ShuffleUtils.java    |   1 +
 .../common/shuffle/impl/ShuffleManager.java     |   2 +
 .../shuffle/orderedgrouped/MergeManager.java    |  35 ++--
 .../common/shuffle/orderedgrouped/Shuffle.java  |   4 +-
 .../orderedgrouped/ShuffleScheduler.java        |   3 +-
 .../common/sort/impl/ExternalSorter.java        |  11 +-
 .../common/sort/impl/PipelinedSorter.java       |  10 +-
 .../common/sort/impl/dflt/DefaultSorter.java    |  10 +-
 .../writers/UnorderedPartitionedKVWriter.java   |   9 +-
 .../input/ConcatenatedMergedKeyValueInput.java  |   2 +
 .../input/ConcatenatedMergedKeyValuesInput.java |   2 +
 .../library/input/OrderedGroupedKVInput.java    |   8 +-
 .../input/OrderedGroupedMergedKVInput.java      |   8 +-
 .../runtime/library/input/UnorderedKVInput.java |   3 +-
 .../common/readers/TestUnorderedKVReader.java   |   5 +-
 .../orderedgrouped/TestMergeManager.java        |   3 +
 .../orderedgrouped/TestShuffleScheduler.java    |   2 +
 .../common/sort/impl/TestPipelinedSorter.java   |  38 ++---
 .../sort/impl/dflt/TestDefaultSorter.java       |   2 +
 .../TestUnorderedPartitionedKVWriter.java       |   4 +-
 .../input/TestSortedGroupedMergedInput.java     |  26 +--
 40 files changed, 383 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3ce1640..52c73da 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
   TEZ-2679. Admin forms of launch env settings
 
 ALL CHANGES:
+  TEZ-2918. Make progress notifications in IOs
   TEZ-2940. Invalid shuffle max slow start setting causes vertex to hang indefinitely
   TEZ-2930. Tez UI: Parent controller is not polling at times
   TEZ-1670. Add tests for all converter functions in HistoryEventTimelineConversion.

http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-api/src/main/java/org/apache/tez/runtime/api/MergedInputContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedInputContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedInputContext.java
index 41c519b..65bb087 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedInputContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedInputContext.java
@@ -42,6 +42,11 @@ public interface MergedInputContext {
   public void inputIsReady();
   
   /**
+   * Inform the framework that progress has been made
+   */
+  public void notifyProgress();
+  
+  /**
    * Get the work directories for the Input
    * @return an array of work dirs
    */

http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 27eb69b..bfd1634 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -193,6 +193,7 @@ public class TaskAttemptImpl implements TaskAttempt,
   org.apache.tez.runtime.api.impl.TaskStatistics statistics;
   
   long lastNotifyProgressTimestamp = 0;
+  private final long hungIntervalMax;
 
   // Used to store locality information when
   Set<String> taskHosts = new HashSet<String>();
@@ -488,6 +489,10 @@ public class TaskAttemptImpl implements TaskAttempt,
     this.taskResource = resource;
     this.containerContext = containerContext;
     this.leafVertex = leafVertex;
+    this.hungIntervalMax = conf.getLong(
+        TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS, 
+        TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS_DEFAULT);
+
   }
 
   @Override
@@ -1378,14 +1383,11 @@ public class TaskAttemptImpl implements TaskAttempt,
         ta.lastNotifyProgressTimestamp = ta.clock.getTime();
       } else {
         long currTime = ta.clock.getTime();
-        long hungIntervalMax = ta.conf.getLong(
-            TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS, 
-            TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS_DEFAULT);
-        if (hungIntervalMax > 0 &&
-            currTime - ta.lastNotifyProgressTimestamp > hungIntervalMax) {
+        if (ta.hungIntervalMax > 0 &&
+            currTime - ta.lastNotifyProgressTimestamp > ta.hungIntervalMax) {
           // task is hung
           String diagnostics = "Attempt failed because it appears to make no progress for " + 
-          hungIntervalMax + "ms";
+          ta.hungIntervalMax + "ms";
           LOG.info(diagnostics + " " + ta.getID());
           // send event that will fail this attempt
           ta.sendEvent(

http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index 93161cb..b68d135 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -461,9 +461,10 @@ public class MRInput extends MRInputBase {
           mrReader = new MRReaderMapReduce(jobConf, getContext().getCounters(), inputRecordCounter,
               getContext().getApplicationId().getClusterTimestamp(), getContext()
                   .getTaskVertexIndex(), getContext().getApplicationId().getId(), getContext()
-                  .getTaskIndex(), getContext().getTaskAttemptNumber());
+                  .getTaskIndex(), getContext().getTaskAttemptNumber(), getContext());
         } else {
-          mrReader = new MRReaderMapred(jobConf, getContext().getCounters(), inputRecordCounter);
+          mrReader = new MRReaderMapred(jobConf, getContext().getCounters(), inputRecordCounter, 
+              getContext());
         }
       } else {
         TaskSplitMetaInfo[] allMetaInfo = MRInputUtils.readSplits(jobConf);
@@ -477,14 +478,14 @@ public class MRInput extends MRInputBase {
           mrReader = new MRReaderMapReduce(jobConf, newInputSplit, getContext().getCounters(),
               inputRecordCounter, getContext().getApplicationId().getClusterTimestamp(),
               getContext().getTaskVertexIndex(), getContext().getApplicationId().getId(),
-              getContext().getTaskIndex(), getContext().getTaskAttemptNumber());
+              getContext().getTaskIndex(), getContext().getTaskAttemptNumber(), getContext());
         } else {
           org.apache.hadoop.mapred.InputSplit oldInputSplit = MRInputUtils
               .getOldSplitDetailsFromDisk(splitMetaInfo, jobConf, getContext().getCounters()
                   .findCounter(TaskCounter.SPLIT_RAW_BYTES));
           mrReader =
               new MRReaderMapred(jobConf, oldInputSplit, getContext().getCounters(),
-                  inputRecordCounter);
+                  inputRecordCounter, getContext());
         }
       }
     } finally {
@@ -508,6 +509,7 @@ public class MRInput extends MRInputBase {
       return new KeyValueReader() {
         @Override
         public boolean next() throws IOException {
+          getContext().notifyProgress();
           return false;
         }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
index 4a792dc..2b60f29 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
@@ -177,7 +177,7 @@ public class MultiMRInput extends MRInputBase {
           getContext().getCounters(), inputRecordCounter, getContext().getApplicationId()
           .getClusterTimestamp(), getContext().getTaskVertexIndex(), getContext()
           .getApplicationId().getId(), getContext().getTaskIndex(), getContext()
-          .getTaskAttemptNumber());
+          .getTaskAttemptNumber(), getContext());
       if (LOG.isDebugEnabled()) {
         LOG.debug(getContext().getSourceVertexName() + " split Details -> SplitClass: " +
             split.getClass().getName() + ", NewSplit: " + split);
@@ -186,7 +186,7 @@ public class MultiMRInput extends MRInputBase {
     } else {
       split = MRInputUtils.getOldSplitDetailsFromEvent(splitProto, localJobConf);
       reader = new MRReaderMapred(localJobConf, (org.apache.hadoop.mapred.InputSplit) split,
-          getContext().getCounters(), inputRecordCounter);
+          getContext().getCounters(), inputRecordCounter, getContext());
       if (LOG.isDebugEnabled()) {
         LOG.debug(getContext().getSourceVertexName() + " split Details -> SplitClass: " +
             split.getClass().getName() + ", OldSplit: " + split);

http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReader.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReader.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReader.java
index 8a20827..aa35fec 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReader.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReader.java
@@ -21,14 +21,30 @@ package org.apache.tez.mapreduce.lib;
 import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 
 @Private
 public abstract class MRReader extends KeyValueReader {
+  
+  private final InputContext context;
+  
   public abstract void setSplit(Object split) throws IOException;
   public abstract boolean isSetup();
   public abstract float getProgress() throws IOException, InterruptedException;
   public abstract void close() throws IOException;
   public abstract Object getSplit();
   public abstract Object getRecordReader();
+  
+  public MRReader(InputContext context) {
+    this.context = context;
+  }
+  
+  protected final void notifyProgress() {
+    context.notifyProgress();
+  }
+  
+  protected final void notifyDone() {
+    context.notifyProgress();
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java
index 5fc3e49..10b871e 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java
@@ -20,6 +20,7 @@ package org.apache.tez.mapreduce.lib;
 
 import java.io.IOException;
 
+import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.library.api.IOInterruptedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,15 +52,16 @@ public class MRReaderMapReduce extends MRReader {
   private boolean setupComplete = false;
 
   public MRReaderMapReduce(JobConf jobConf, TezCounters tezCounters, TezCounter inputRecordCounter,
-      long clusterId, int vertexIndex, int appId, int taskIndex, int taskAttemptNumber)
+      long clusterId, int vertexIndex, int appId, int taskIndex, int taskAttemptNumber, InputContext context)
       throws IOException {
     this(jobConf, null, tezCounters, inputRecordCounter, clusterId, vertexIndex, appId, taskIndex,
-        taskAttemptNumber);
+        taskAttemptNumber, context);
   }
 
   public MRReaderMapReduce(JobConf jobConf, InputSplit inputSplit, TezCounters tezCounters,
       TezCounter inputRecordCounter, long clusterId, int vertexIndex, int appId, int taskIndex,
-      int taskAttemptNumber) throws IOException {
+      int taskAttemptNumber, InputContext context) throws IOException {
+    super(context);
     this.inputRecordCounter = inputRecordCounter;
     this.taskAttemptContext = new TaskAttemptContextImpl(jobConf, tezCounters, clusterId,
         vertexIndex, appId, taskIndex, taskAttemptNumber, true, null);
@@ -121,9 +123,11 @@ public class MRReaderMapReduce extends MRReader {
     }
     if (hasNext) {
       inputRecordCounter.increment(1);
+      notifyProgress();
     } else {
       hasCompletedProcessing();
       completedProcessing = true;
+      notifyDone();
     }
     return hasNext;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java
index 1bf71f6..d81debb 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java
@@ -33,6 +33,7 @@ import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.mapreduce.hadoop.mapred.MRReporter;
 import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.runtime.api.InputContext;
 
 import com.google.common.base.Preconditions;
 
@@ -56,13 +57,15 @@ public class MRReaderMapred extends MRReader {
 
   private boolean setupComplete = false;
 
-  public MRReaderMapred(JobConf jobConf, TezCounters tezCounters, TezCounter inputRecordCounter)
+  public MRReaderMapred(JobConf jobConf, TezCounters tezCounters, TezCounter inputRecordCounter, 
+      InputContext context)
       throws IOException {
-    this(jobConf, null, tezCounters, inputRecordCounter);
+    this(jobConf, null, tezCounters, inputRecordCounter, context);
   }
 
   public MRReaderMapred(JobConf jobConf, InputSplit inputSplit, TezCounters tezCounters,
-      TezCounter inputRecordCounter) throws IOException {
+      TezCounter inputRecordCounter, InputContext context) throws IOException {
+    super(context);
     this.jobConf = jobConf;
     this.tezCounters = tezCounters;
     this.inputRecordCounter = inputRecordCounter;
@@ -113,9 +116,11 @@ public class MRReaderMapred extends MRReader {
     boolean hasNext = recordReader.next(key, value);
     if (hasNext) {
       inputRecordCounter.increment(1);
+      notifyProgress();
     } else {
       hasCompletedProcessing();
       completedProcessing = true;
+      notifyDone();
     }
     // The underlying reader does not throw InterruptedExceptions. Cannot convert to an
     // IOInterruptedException without checking the interrupt flag on each request, which is also

http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index 12e5092..ec83bf5 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -510,6 +510,7 @@ public class MROutput extends AbstractLogicalOutput {
           oldRecordWriter.write(key, value);
         }
         outputRecordCounter.increment(1);
+        getContext().notifyProgress();
       }
     };
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java
index 50114b9..448b90c 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java
@@ -19,6 +19,8 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 import java.io.IOException;
 import java.util.LinkedList;
@@ -57,6 +59,7 @@ public class TestMRInput {
     mrInput.start();
 
     assertFalse(mrInput.getReader().next());
+    verify(inputContext, times(1)).notifyProgress();
 
     List<Event> events = new LinkedList<>();
     try {

http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
index 80e3e77..db5643e 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
@@ -24,6 +24,8 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -155,9 +157,11 @@ public class TestMultiMRInput {
     input.handleEvents(eventList);
 
     int readerCount = 0;
+    int recordCount = 0;
     for (KeyValueReader reader : input.getKeyValueReaders()) {
       readerCount++;
       while (reader.next()) {
+        verify(inputContext, times(++recordCount) ).notifyProgress();
         if (data1.size() == 0) {
           fail("Found more records than expected");
         }

http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/lib/TestKVReadersWithMR.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/lib/TestKVReadersWithMR.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/lib/TestKVReadersWithMR.java
index 65f5ad0..dad18de 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/lib/TestKVReadersWithMR.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/lib/TestKVReadersWithMR.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.runtime.api.InputContext;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -32,6 +33,9 @@ import java.io.IOException;
 
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 public class TestKVReadersWithMR {
 
@@ -60,12 +64,14 @@ public class TestKVReadersWithMR {
   }
 
   public void testWithSpecificNumberOfKV(int kvPairs) throws IOException {
-    MRReaderMapred reader = new MRReaderMapred(conf, counters, inputRecordCounter);
+    InputContext mockContext = mock(InputContext.class);
+    MRReaderMapred reader = new MRReaderMapred(conf, counters, inputRecordCounter, mockContext);
 
     reader.recordReader = new DummyRecordReader(kvPairs);
     int records = 0;
     while (reader.next()) {
       records++;
+      verify(mockContext, times(records)).notifyProgress();
     }
     assertTrue(kvPairs == records);
 
@@ -80,13 +86,15 @@ public class TestKVReadersWithMR {
   }
 
   public void testWithSpecificNumberOfKV_MapReduce(int kvPairs) throws IOException {
+    InputContext mockContext = mock(InputContext.class);
     MRReaderMapReduce reader = new MRReaderMapReduce(conf, counters, inputRecordCounter, -1, 1,
-        10, 20, 30);
+        10, 20, 30, mockContext);
 
     reader.recordReader = new DummyRecordReaderMapReduce(kvPairs);
     int records = 0;
     while (reader.next()) {
       records++;
+      verify(mockContext, times(records)).notifyProgress();
     }
     assertTrue(kvPairs == records);
 

http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
index b898fe0..0129a8b 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
@@ -20,21 +20,52 @@ package org.apache.tez.mapreduce.output;
 import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
 
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.Writer;
+import java.util.HashMap;
+import java.util.List;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DataSinkDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.mapreduce.TestUmbilical;
+import org.apache.tez.mapreduce.TezTestUtils;
 import org.apache.tez.mapreduce.hadoop.MRConfig;
+import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
 import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
+import org.apache.tez.runtime.api.impl.InputSpec;
+import org.apache.tez.runtime.api.impl.OutputSpec;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.api.impl.TezUmbilical;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+import org.apache.tez.runtime.library.processor.SimpleProcessor;
+import org.junit.Ignore;
 import org.junit.Test;
 
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
+
 
 public class TestMROutput {
 
@@ -144,4 +175,134 @@ public class TestMROutput {
     when(outputContext.getCounters()).thenReturn(new TezCounters());
     return outputContext;
   }
+  
+  public static LogicalIOProcessorRuntimeTask createLogicalTask(
+      Configuration conf,
+      TezUmbilical umbilical, String dagName,
+      String vertexName) throws Exception {
+    ProcessorDescriptor procDesc = ProcessorDescriptor.create(TestProcessor.class.getName());
+    List<InputSpec> inputSpecs = Lists.newLinkedList();
+    List<OutputSpec> outputSpecs = Lists.newLinkedList();
+    outputSpecs.add(new OutputSpec("Null",
+        MROutput.createConfigBuilder(conf, TestOutputFormat.class).build().getOutputDescriptor(), 1));
+    
+    TaskSpec taskSpec = new TaskSpec(
+        TezTestUtils.getMockTaskAttemptId(0, 0, 0, 0),
+        dagName, vertexName, -1,
+        procDesc,
+        inputSpecs,
+        outputSpecs, null);
+
+    FileSystem fs = FileSystem.getLocal(conf);
+    Path workDir =
+        new Path(new Path(System.getProperty("test.build.data", "/tmp")),
+                 "TestMapOutput").makeQualified(fs.getUri(), fs.getWorkingDirectory());
+
+    LogicalIOProcessorRuntimeTask task = new LogicalIOProcessorRuntimeTask(
+        taskSpec,
+        0,
+        conf,
+        new String[] {workDir.toString()},
+        umbilical,
+        null,
+        new HashMap<String, String>(),
+        HashMultimap.<String, String>create(), null, "", new ExecutionContextImpl("localhost"),
+        Runtime.getRuntime().maxMemory(), true);
+    return task;
+  }
+  
+  public static class TestOutputCommitter extends OutputCommitter {
+
+    @Override
+    public void setupJob(JobContext jobContext) throws IOException {
+    }
+
+    @Override
+    public void setupTask(TaskAttemptContext taskContext) throws IOException {
+    }
+
+    @Override
+    public boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException {
+      return false;
+    }
+
+    @Override
+    public void commitTask(TaskAttemptContext taskContext) throws IOException {
+    }
+
+    @Override
+    public void abortTask(TaskAttemptContext taskContext) throws IOException {
+    }
+    
+  }
+  
+  public static class TestOutputFormat extends OutputFormat<String, String> {
+    public static class TestRecordWriter extends RecordWriter<String, String> {
+      Writer writer;
+      boolean doWrite;
+      TestRecordWriter(boolean write) throws IOException {
+        this.doWrite = write;
+        if (doWrite) {
+          File f = File.createTempFile("test", null);
+          f.deleteOnExit();
+          writer = new BufferedWriter(new FileWriter(f));
+        }
+      }
+      
+      @Override
+      public void write(String key, String value) throws IOException, InterruptedException {
+        if (doWrite) {
+          writer.write(key);
+          writer.write(value);
+        }
+      }
+
+      @Override
+      public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+        writer.close();
+      }
+      
+    }
+    
+    @Override
+    public RecordWriter<String, String> getRecordWriter(TaskAttemptContext context)
+        throws IOException, InterruptedException {
+      return new TestRecordWriter(true);
+    }
+
+    @Override
+    public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
+    }
+
+    @Override
+    public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+        throws IOException, InterruptedException {
+      return new TestOutputCommitter();
+    }
+  }
+
+  public static class TestProcessor extends SimpleProcessor {
+    public TestProcessor(ProcessorContext context) {
+      super(context);
+    }
+
+    @Override
+    public void run() throws Exception {
+      KeyValueWriter writer = (KeyValueWriter) getOutputs().values().iterator().next().getWriter();
+      for (int i=0; i<1000000; ++i) {
+        writer.write("key", "value");
+      }
+    }
+
+  }
+
+  @Ignore
+  @Test
+  public void testPerf() throws Exception {
+    LogicalIOProcessorRuntimeTask task = createLogicalTask(new Configuration(), 
+        new TestUmbilical(), "dag", "vertex");
+    task.initialize();
+    task.run();
+    task.close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index 4b00c97..6b9b016 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -535,7 +535,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
        }
        MergedInputContext mergedInputContext =
            new TezMergedInputContextImpl(groupInputSpec.getMergedInputDescriptor().getUserPayload(),
-               groupInputSpec.getGroupName(), groupInputsMap, inputReadyTracker, localDirs);
+               groupInputSpec.getGroupName(), groupInputsMap, inputReadyTracker, localDirs, this);
        List<Input> inputs = Lists.newArrayListWithCapacity(groupInputSpec.getGroupVertices().size());
        for (String groupVertex : groupInputSpec.getGroupVertices()) {
          inputs.add(inputsMap.get(groupVertex));

http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
index 23e57b1..529dde0 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
@@ -54,7 +54,7 @@ public abstract class RuntimeTask {
   private final AtomicBoolean taskDone;
   private final TaskCounterUpdater counterUpdater;
   private final TaskStatistics statistics;
-  private volatile boolean progressNotified;
+  private final AtomicBoolean progressNotified = new AtomicBoolean(false);
 
   protected RuntimeTask(TaskSpec taskSpec, Configuration tezConf,
       TezUmbilical tezUmbilical, String pid, boolean setupSysCounterUpdater) {
@@ -105,13 +105,12 @@ public abstract class RuntimeTask {
     this.fatalErrorMessage = message;
   }
   
-  public void notifyProgressInvocation() {
-    progressNotified = true;
+  public final void notifyProgressInvocation() {
+    progressNotified.lazySet(true);
   }
   
   public boolean getAndClearProgressNotification() {
-    boolean retVal = progressNotified;
-    progressNotified = false;
+    boolean retVal = progressNotified.getAndSet(false);
     return retVal;
   }
   

http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java
index 74592c6..e35e332 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java
@@ -27,6 +27,7 @@ import java.util.Map;
 
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.runtime.InputReadyTracker;
+import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
 import org.apache.tez.runtime.api.MergedLogicalInput;
 import org.apache.tez.runtime.api.MergedInputContext;
 
@@ -38,10 +39,12 @@ public class TezMergedInputContextImpl implements MergedInputContext {
   private final Map<String, MergedLogicalInput> groupInputsMap;
   private final InputReadyTracker inputReadyTracker;
   private final String[] workDirs;
+  private final LogicalIOProcessorRuntimeTask runtimeTask;
 
   public TezMergedInputContextImpl(@Nullable UserPayload userPayload, String groupInputName,
                                    Map<String, MergedLogicalInput> groupInputsMap,
-                                   InputReadyTracker inputReadyTracker, String[] workDirs) {
+                                   InputReadyTracker inputReadyTracker, String[] workDirs,
+                                   LogicalIOProcessorRuntimeTask runtimeTask) {
     checkNotNull(groupInputName, "groupInputName is null");
     checkNotNull(groupInputsMap, "input-group map is null");
     checkNotNull(inputReadyTracker, "inputReadyTracker is null");
@@ -50,6 +53,7 @@ public class TezMergedInputContextImpl implements MergedInputContext {
     this.userPayload = userPayload;
     this.inputReadyTracker = inputReadyTracker;
     this.workDirs = workDirs;
+    this.runtimeTask = runtimeTask;
   }
 
   @Override
@@ -67,4 +71,9 @@ public class TezMergedInputContextImpl implements MergedInputContext {
     return Arrays.copyOf(workDirs, workDirs.length);
   }
 
+  @Override
+  public final void notifyProgress() {
+    runtimeTask.notifyProgressInvocation();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
index 211f9d7..c12b334 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
@@ -174,7 +174,7 @@ public abstract class TezTaskContextImpl implements TaskContext, Closeable {
   }
 
   @Override
-  public void notifyProgress() {
+  public final void notifyProgress() {
     runtimeTask.notifyProgressInvocation();
   }
   

http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java
index a77e38f..29c5023 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java
@@ -156,8 +156,10 @@ public class TestInputReadyTracker {
     group2Inputs.add(input4);
 
     Map<String, MergedLogicalInput> mergedInputMap = new HashMap<String, MergedLogicalInput>();
-    MergedInputContext mergedInputContext1 = new TezMergedInputContextImpl(null, "group1", mergedInputMap, inputReadyTracker, null);
-    MergedInputContext mergedInputContext2 = new TezMergedInputContextImpl(null, "group2", mergedInputMap, inputReadyTracker, null);
+    MergedInputContext mergedInputContext1 = new TezMergedInputContextImpl(
+        null, "group1", mergedInputMap, inputReadyTracker, null, null);
+    MergedInputContext mergedInputContext2 = new TezMergedInputContextImpl(
+        null, "group2", mergedInputMap, inputReadyTracker, null, null);
 
     AnyOneMergedInputForTest group1 = new AnyOneMergedInputForTest(mergedInputContext1, group1Inputs);
     AllMergedInputForTest group2 = new AllMergedInputForTest(mergedInputContext2, group2Inputs);

http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
index a8dd1b2..57bb121 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
@@ -20,6 +20,7 @@ package org.apache.tez.runtime.library.common.readers;
 
 import java.io.IOException;
 
+import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.library.api.IOInterruptedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,6 +62,7 @@ public class UnorderedKVReader<K, V> extends KeyValueReader {
   private final int ifileBufferSize;
   
   private final TezCounter inputRecordCounter;
+  private final InputContext context;
   
   private K key;
   private V value;
@@ -75,10 +77,10 @@ public class UnorderedKVReader<K, V> extends KeyValueReader {
 
   public UnorderedKVReader(ShuffleManager shuffleManager, Configuration conf,
       CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLength, int ifileBufferSize,
-      TezCounter inputRecordCounter)
+      TezCounter inputRecordCounter, InputContext context)
       throws IOException {
     this.shuffleManager = shuffleManager;
-
+    this.context = context;
     this.codec = codec;
     this.ifileReadAhead = ifileReadAhead;
     this.ifileReadAheadLength = ifileReadAheadLength;
@@ -113,6 +115,7 @@ public class UnorderedKVReader<K, V> extends KeyValueReader {
   public boolean next() throws IOException {
     if (readNextFromCurrentReader()) {
       inputRecordCounter.increment(1);
+      context.notifyProgress();
       numRecordsRead++;
       return true;
     } else {
@@ -120,6 +123,7 @@ public class UnorderedKVReader<K, V> extends KeyValueReader {
       while (nextInputExists) {
         if(readNextFromCurrentReader()) {
           inputRecordCounter.increment(1);
+          context.notifyProgress();
           numRecordsRead++;
           return true;
         }

http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index 818cfaa..431ba38 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -397,6 +397,7 @@ public class ShuffleUtils {
       @Nullable long[] partitionStats) throws IOException {
     Preconditions.checkArgument(eventList != null, "EventList can't be null");
 
+    context.notifyProgress();
     if (finalMergeEnabled) {
       Preconditions.checkArgument(isLastEvent, "Can not send multiple events when final merge is "
           + "enabled");

http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index a7c1c59..b3e050a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -604,6 +604,7 @@ public class ShuffleManager implements FetcherCallback {
       lock.unlock();
     }
     
+    inputContext.notifyProgress();
     boolean committed = false;
     if (!completedInputSet.contains(inputIdentifier)) {
       synchronized (completedInputSet) {
@@ -760,6 +761,7 @@ public class ShuffleManager implements FetcherCallback {
         + "InputIdentifier: " + srcAttemptIdentifier + ", connectFailed: "
         + connectFailed);
     failedShufflesCounter.increment(1);
+    inputContext.notifyProgress();
     if (srcAttemptIdentifier == null) {
       reportFatalError(null, "Received fetchFailure for an unknown src (null)");
     } else {

http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
index fb9b243..61ff338 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
@@ -84,7 +84,12 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
   private final LocalDirAllocator localDirAllocator;
   
   private final  TezTaskOutputFiles mapOutputFile;
-  private final Progressable nullProgressable = new NullProgressable();
+  private final Progressable progressable = new Progressable() {
+    @Override
+    public void progress() {
+      inputContext.notifyProgress();
+    }
+  };
   private final Combiner combiner;  
   
   private final Set<MapOutput> inMemoryMergedMapOutputs = 
@@ -624,6 +629,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
         return;
       }
 
+      inputContext.notifyProgress();
 
       InputAttemptIdentifier dummyMapId = inputs.get(0).getAttemptIdentifier(); 
       List<Segment> inMemorySegments = new ArrayList<Segment>();
@@ -650,8 +656,8 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
                        inMemorySegments, inMemorySegments.size(),
                        new Path(inputContext.getUniqueIdentifier()),
                        (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
-                       nullProgressable, null, null, null, null); 
-      TezMerger.writeFile(rIter, writer, nullProgressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
+                       progressable, null, null, null, null); 
+      TezMerger.writeFile(rIter, writer, progressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
       writer.close();
 
       LOG.info(inputContext.getSourceVertexName() +
@@ -697,6 +703,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
       }
 
       numMemToDiskMerges.increment(1);
+      inputContext.notifyProgress();
       
       //name this output file same as the name of the first file that is 
       //there in the current list of inmem files (this is guaranteed to
@@ -716,7 +723,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
 
       // TODO Maybe track serialized vs deserialized bytes.
       
-      // All disk writes done by this merge are overhead - due to the lac of
+      // All disk writes done by this merge are overhead - due to the lack of
       // adequate memory to keep all segments in memory.
       outputPath = mapOutputFile.getInputFileForWrite(
           srcTaskIdentifier.getInputIdentifier().getInputIndex(), srcTaskIdentifier.getSpillEventId(),
@@ -742,13 +749,13 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
             (Class)ConfigUtils.getIntermediateInputValueClass(conf),
             inMemorySegments, inMemorySegments.size(),
             tmpDir, (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
-            nullProgressable, spilledRecordsCounter, null, additionalBytesRead, null);
+            progressable, spilledRecordsCounter, null, additionalBytesRead, null);
         // spilledRecordsCounter is tracking the number of keys that will be
         // read from each of the segments being merged - which is essentially
         // what will be written to disk.
 
         if (null == combiner) {
-          TezMerger.writeFile(rIter, writer, nullProgressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
+          TezMerger.writeFile(rIter, writer, progressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
         } else {
           // TODO Counters for Combine
           runCombineProcessor(rIter, writer);
@@ -818,7 +825,8 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
         return;
       }
       numDiskToDiskMerges.increment(1);
-      
+      inputContext.notifyProgress();
+
       long approxOutputSize = 0;
       int bytesPerSum = 
         conf.getInt("io.bytes.per.checksum", 512);
@@ -879,13 +887,13 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
             inputSegments,
             ioSortFactor, tmpDir,
             (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
-            nullProgressable, true, spilledRecordsCounter, null,
+            progressable, true, spilledRecordsCounter, null,
             mergedMapOutputsCounter, null);
 
         // TODO Maybe differentiate between data written because of Merges and
         // the finalMerge (i.e. final mem available may be different from
         // initial merge mem)
-        TezMerger.writeFile(iter, writer, nullProgressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
+        TezMerger.writeFile(iter, writer, progressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
         writer.close();
         additionalBytesWritten.increment(writer.getCompressedLength());
       } catch (IOException e) {
@@ -1010,6 +1018,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
       }
     }
     
+    inputContext.notifyProgress();
 
     // merge config params
     Class keyClass = (Class)ConfigUtils.getIntermediateInputKeyClass(job);
@@ -1046,12 +1055,12 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
           mapOutputFile.getInputFileForWrite(srcTaskId, Integer.MAX_VALUE,
               inMemToDiskBytes).suffix(Constants.MERGED_OUTPUT_PREFIX);
         final TezRawKeyValueIterator rIter = TezMerger.merge(job, fs, keyClass, valueClass,
-            memDiskSegments, numMemDiskSegments, tmpDir, comparator, nullProgressable,
+            memDiskSegments, numMemDiskSegments, tmpDir, comparator, progressable,
             spilledRecordsCounter, null, additionalBytesRead, null);
         final Writer writer = new Writer(job, fs, outputPath,
             keyClass, valueClass, codec, null, null);
         try {
-          TezMerger.writeFile(rIter, writer, nullProgressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
+          TezMerger.writeFile(rIter, writer, progressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
         } catch (IOException e) {
           if (null != outputPath) {
             try {
@@ -1127,7 +1136,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
       TezRawKeyValueIterator diskMerge = TezMerger.merge(
           job, fs, keyClass, valueClass, codec, diskSegments,
           ioSortFactor, numInMemSegments, tmpDir, comparator,
-          nullProgressable, false, spilledRecordsCounter, null, additionalBytesRead, null);
+          progressable, false, spilledRecordsCounter, null, additionalBytesRead, null);
       diskSegments.clear();
       if (0 == finalSegments.size()) {
         return diskMerge;
@@ -1138,7 +1147,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
     // This is doing nothing but creating an iterator over the segments.
     return TezMerger.merge(job, fs, keyClass, valueClass,
                  finalSegments, finalSegments.size(), tmpDir,
-                 comparator, nullProgressable, spilledRecordsCounter, null,
+                 comparator, progressable, spilledRecordsCounter, null,
                  additionalBytesRead, null);
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
index b5dcd4c..de3b2cb 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
@@ -309,6 +309,7 @@ public class Shuffle implements ExceptionReporter {
 
       // Finish the on-going merges...
       TezRawKeyValueIterator kvIter = null;
+      inputContext.notifyProgress();
       try {
         kvIter = merger.close();
       } catch (Throwable e) {
@@ -317,7 +318,8 @@ public class Shuffle implements ExceptionReporter {
         throw new ShuffleError("Error while doing final merge ", e);
       }
       mergePhaseTime.setValue(System.currentTimeMillis() - startTime);
-
+      
+      inputContext.notifyProgress();
       // Sanity check
       synchronized (Shuffle.this) {
         if (throwable.get() != null) {

http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index 22da46c..dcfb274 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -466,6 +466,7 @@ class ShuffleScheduler {
                                          boolean isLocalFetch
                                          ) throws IOException {
 
+    inputContext.notifyProgress();
     if (!isInputFinished(srcAttemptIdentifier.getInputIdentifier().getInputIndex())) {
       if (!isLocalFetch) {
         /**
@@ -626,7 +627,7 @@ class ShuffleScheduler {
                                       boolean connectError,
                                       boolean isLocalFetch) {
     failedShuffleCounter.increment(1);
-
+    inputContext.notifyProgress();
     int failures = incrementAndGetFailureAttempt(srcAttempt);
 
     if (!isLocalFetch) {

http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
index aa521ea..7a2dc68 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
@@ -59,7 +59,6 @@ import org.apache.tez.runtime.library.common.combine.Combiner;
 import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
 import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
 import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
-import org.apache.tez.runtime.library.hadoop.compat.NullProgressable;
 
 import com.google.common.base.Preconditions;
 
@@ -72,6 +71,7 @@ public abstract class ExternalSorter {
     spillFileIndexPaths.clear();
     spillFilePaths.clear();
     reportStatistics();
+    outputContext.notifyProgress();
   }
 
   public abstract void flush() throws IOException;
@@ -86,7 +86,13 @@ public abstract class ExternalSorter {
     }
   }
 
-  protected final Progressable nullProgressable = new NullProgressable();
+  protected final Progressable progressable = new Progressable() {
+    @Override
+    public void progress() {
+      outputContext.notifyProgress();
+    }
+  };
+
   protected final OutputContext outputContext;
   protected final Combiner combiner;
   protected final Partitioner partitioner;
@@ -298,6 +304,7 @@ public abstract class ExternalSorter {
   protected void runCombineProcessor(TezRawKeyValueIterator kvIter,
       Writer writer) throws IOException {
     try {
+      outputContext.notifyProgress();
       combiner.combine(kvIter, writer);
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();

http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index 2d53a2e..33a65d2 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -429,6 +429,7 @@ public class PipelinedSorter extends ExternalSorter {
     span.kvmeta.put(valstart);
     span.kvmeta.put(valend - valstart);
     mapOutputRecordCounter.increment(1);
+    outputContext.notifyProgress();
     mapOutputByteCounter.increment(valend - keystart);
   }
 
@@ -545,6 +546,7 @@ public class PipelinedSorter extends ExternalSorter {
         if (isThreadInterrupted()) {
           return false;
         }
+        outputContext.notifyProgress();
         TezRawKeyValueIterator kvIter = merger.filter(i);
         //write merged output to disk
         long segmentStart = out.getPos();
@@ -611,6 +613,7 @@ public class PipelinedSorter extends ExternalSorter {
   public void flush() throws IOException {
     final String uniqueIdentifier = outputContext.getUniqueIdentifier();
 
+    outputContext.notifyProgress();
     /**
      * Possible that the thread got interrupted when flush was happening or when the flush was
      * never invoked. As a part of cleanup activity in TezTaskRunner, it would invoke close()
@@ -698,6 +701,7 @@ public class PipelinedSorter extends ExternalSorter {
         }
         numShuffleChunks.setValue(numSpills);
         fileOutputByteCounter.increment(rfs.getFileStatus(finalOutputFile).getLen());
+        // ??? why are events not being sent here?
         return;
       }
 
@@ -742,7 +746,7 @@ public class PipelinedSorter extends ExternalSorter {
             segmentList, mergeFactor,
             new Path(uniqueIdentifier),
             (RawComparator) ConfigUtils.getIntermediateOutputKeyComparator(conf),
-            nullProgressable, sortSegments, true,
+            progressable, sortSegments, true,
             null, spilledRecordsCounter, additionalSpillBytesRead,
             null); // Not using any Progress in TezMerger. Should just work.
         //write merged output to disk
@@ -751,7 +755,7 @@ public class PipelinedSorter extends ExternalSorter {
             new Writer(conf, finalOut, keyClass, valClass, codec,
                 spilledRecordsCounter, null, merger.needsRLE());
         if (combiner == null || numSpills < minSpillsForCombine) {
-          TezMerger.writeFile(kvIter, writer, nullProgressable,
+          TezMerger.writeFile(kvIter, writer, progressable,
               TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
         } else {
           runCombineProcessor(kvIter, writer);
@@ -893,7 +897,7 @@ public class PipelinedSorter extends ExternalSorter {
     public SpanIterator sort(IndexedSorter sorter) {
       long start = System.currentTimeMillis();
       if(length() > 1) {
-        sorter.sort(this, 0, length(), nullProgressable);
+        sorter.sort(this, 0, length(), progressable);
       }
       LOG.info(outputContext.getDestinationVertexName() + ": " + "done sorting span=" + index + ", length=" + length() + ", "
           + "time=" + (System.currentTimeMillis() - start));

http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index a833228..67da617 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -347,6 +347,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
       int valend = bb.markRecord();
 
       mapOutputRecordCounter.increment(1);
+      outputContext.notifyProgress();
       mapOutputByteCounter.increment(
           distanceTo(keystart, valend, bufvoid));
 
@@ -662,6 +663,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
   @Override
   public void flush() throws IOException {
     LOG.info(outputContext.getDestinationVertexName() + ": " + "Starting flush of map output");
+    outputContext.notifyProgress();
     if (Thread.currentThread().isInterrupted()) {
       /**
        * Possible that the thread got interrupted when flush was happening or when the flush was
@@ -710,6 +712,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
           sameKeyCount = sameKey;
           totalKeysCount = totalKeys;
         }
+        outputContext.notifyProgress();
         sortAndSpill(sameKeyCount, totalKeysCount);
       }
     } catch (InterruptedException e) {
@@ -835,7 +838,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
       throws IOException, InterruptedException {
     final int mstart = getMetaStart();
     final int mend = getMetaEnd();
-    sorter.sort(this, mstart, mend, nullProgressable);
+    sorter.sort(this, mstart, mend, progressable);
     spill(mstart, mend, sameKeyCount, totalKeysCount);
   }
 
@@ -1281,6 +1284,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
         List<Segment> segmentList =
           new ArrayList<Segment>(numSpills);
         for(int i = 0; i < numSpills; i++) {
+          outputContext.notifyProgress();
           TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
 
           Segment s =
@@ -1309,7 +1313,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
                        segmentList, mergeFactor,
                        new Path(taskIdentifier),
                        (RawComparator)ConfigUtils.getIntermediateOutputKeyComparator(conf),
-                       nullProgressable, sortSegments, true,
+                       progressable, sortSegments, true,
                        null, spilledRecordsCounter, additionalSpillBytesRead,
                        null); // Not using any Progress in TezMerger. Should just work.
 
@@ -1320,7 +1324,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
                 spilledRecordsCounter, null);
         if (combiner == null || numSpills < minSpillsForCombine) {
           TezMerger.writeFile(kvIter, writer,
-              nullProgressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
+              progressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
         } else {
           runCombineProcessor(kvIter, writer);
         }

http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index 70b345f..ce410be 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -248,8 +248,9 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
       throw new IOException("Exception during spill", new IOException(spillException));
     }
     if (skipBuffers) {
-      //special case, where we have only one partition and pipeliing is disabled.
-      writer.append(key, value);
+      //special case, where we have only one partition and pipelining is disabled.
+      writer.append(key, value); // ???? Why is outputrecordscounter not updated here?
+      outputContext.notifyProgress();
     } else {
       int partition = partitioner.getPartition(key, value, numPartitions);
       write(key, value, partition);
@@ -319,6 +320,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     outputRecordBytesCounter.increment(currentBuffer.nextPosition - (metaStart + META_SIZE));
     outputBytesWithOverheadCounter.increment((currentBuffer.nextPosition - metaStart) + metaSkip);
     outputRecordsCounter.increment(1);
+    outputContext.notifyProgress();
     currentBuffer.partitionPositions[partition] = metaStart;
     currentBuffer.recordsPerPartition[partition]++;
     currentBuffer.numRecords++;
@@ -407,6 +409,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
       long compressedLength = 0;
       for (int i = 0; i < numPartitions; i++) {
         IFile.Writer writer = null;
+        outputContext.notifyProgress();
         try {
           long segmentStart = out.getPos();
           if (wrappedBuffer.partitionPositions[i] == WrappedBuffer.PARTITION_ABSENT_POSITION) {
@@ -556,6 +559,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
       boolean isLastSpill, String pathComponent, BitSet emptyPartitions)
       throws IOException {
 
+    outputContext.notifyProgress();
     DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto
         .newBuilder();
 
@@ -710,6 +714,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
           }
           synchronized (spillInfoList) {
             for (SpillInfo spillInfo : spillInfoList) {
+              outputContext.notifyProgress();
               TezIndexRecord indexRecord = spillInfo.spillRecord.getIndex(i);
               if (indexRecord.getPartLength() == 0) {
                 // Skip empty partitions within a spill

http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
index 45784d9..0b8ed21 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
@@ -53,6 +53,7 @@ public class ConcatenatedMergedKeyValueInput extends MergedLogicalInput {
         if (currentReaderIndex == getInputs().size()) {
           hasCompletedProcessing();
           completedProcessing = true;
+          getContext().notifyProgress();
           return false;
         }
         try {
@@ -63,6 +64,7 @@ public class ConcatenatedMergedKeyValueInput extends MergedLogicalInput {
           }
           currentReader = (KeyValueReader) reader;
           currentReaderIndex++;
+          getContext().notifyProgress();
         } catch (Exception e) {
           // An InterruptedException is not expected here since this works off of
           // underlying readers which take care of throwing IOInterruptedExceptions

http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
index 27ff324..4a8969e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
@@ -54,6 +54,7 @@ public class ConcatenatedMergedKeyValuesInput extends MergedLogicalInput {
         if (currentReaderIndex == getInputs().size()) {
           hasCompletedProcessing();
           completedProcessing = true;
+          getContext().notifyProgress();
           return false;
         }
         try {
@@ -64,6 +65,7 @@ public class ConcatenatedMergedKeyValuesInput extends MergedLogicalInput {
           }
           currentReader = (KeyValuesReader) reader;
           currentReaderIndex++;
+          getContext().notifyProgress();
         } catch (Exception e) {
           // An InterruptedException is not expected here since this works off of
           // underlying readers which take care of throwing IOInterruptedExceptions

http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
index 39cc471..5e367cf 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
@@ -229,6 +229,7 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput {
         return new KeyValuesReader() {
           @Override
           public boolean next() throws IOException {
+            getContext().notifyProgress();
             hasCompletedProcessing();
             completedProcessing = true;
             return false;
@@ -259,7 +260,7 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput {
     synchronized(this) {
       valuesIter = vIter;
     }
-    return new OrderedGroupedKeyValuesReader(valuesIter);
+    return new OrderedGroupedKeyValuesReader(valuesIter, getContext());
   }
 
   @Override
@@ -307,13 +308,16 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput {
   private static class OrderedGroupedKeyValuesReader extends KeyValuesReader {
 
     private final ValuesIterator valuesIter;
+    private final InputContext context;
 
-    OrderedGroupedKeyValuesReader(ValuesIterator valuesIter) {
+    OrderedGroupedKeyValuesReader(ValuesIterator valuesIter, InputContext context) {
       this.valuesIter = valuesIter;
+      this.context = context;
     }
 
     @Override
     public boolean next() throws IOException {
+      context.notifyProgress();
       return valuesIter.moveToNext();
     }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java
index 41ca7c9..2345bbb 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java
@@ -60,7 +60,7 @@ public class OrderedGroupedMergedKVInput extends MergedLogicalInput {
    */
   @Override
   public KeyValuesReader getReader() throws Exception {
-    return new OrderedGroupedMergedKeyValuesReader(getInputs());
+    return new OrderedGroupedMergedKeyValuesReader(getInputs(), getContext());
   }
 
   @Override
@@ -81,8 +81,10 @@ public class OrderedGroupedMergedKVInput extends MergedLogicalInput {
     private final ValuesIterable currentValues;
     private KeyValuesReader nextKVReader;
     private Object currentKey;
+    private final MergedInputContext context;
 
-    public OrderedGroupedMergedKeyValuesReader(List<Input> inputs) throws Exception {
+    public OrderedGroupedMergedKeyValuesReader(List<Input> inputs, MergedInputContext context) 
+        throws Exception {
       keyComparator = ((OrderedGroupedKVInput) inputs.get(0))
           .getInputKeyComparator();
       pQueue = new PriorityQueue<KeyValuesReader>(inputs.size(),
@@ -95,6 +97,7 @@ public class OrderedGroupedMergedKVInput extends MergedLogicalInput {
         }
       }
       currentValues = new ValuesIterable();
+      this.context = context;
     }
 
     private void advanceAndAddToQueue(KeyValuesReader kvsReadr)
@@ -122,6 +125,7 @@ public class OrderedGroupedMergedKVInput extends MergedLogicalInput {
       finishedReaders.clear();
 
       nextKVReader = pQueue.poll();
+      context.notifyProgress();
       if (nextKVReader != null) {
         currentKey = nextKVReader.getCurrentKey();
         currentValues.moveToNext();

http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
index fad164f..dbbe23f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
@@ -169,6 +169,7 @@ public class UnorderedKVInput extends AbstractLogicalInput {
       return new KeyValueReader() {
         @Override
         public boolean next() throws IOException {
+          getContext().notifyProgress();
           hasCompletedProcessing();
           completedProcessing = true;
           return false;
@@ -241,7 +242,7 @@ public class UnorderedKVInput extends AbstractLogicalInput {
       int ifileBufferSize, boolean ifileReadAheadEnabled, int ifileReadAheadLength)
       throws IOException {
     return new UnorderedKVReader(shuffleManager, conf, codec, ifileReadAheadEnabled,
-        ifileReadAheadLength, ifileBufferSize, inputRecordCounter);
+        ifileReadAheadLength, ifileBufferSize, inputRecordCounter, getContext());
   }
 
   private static final Set<String> confKeys = new HashSet<String>();

http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java
index 80bdc42..c49a423 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.library.api.IOInterruptedException;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
@@ -125,7 +126,7 @@ public class TestUnorderedKVReader {
     }).when(manager).getNextInput();
 
     unorderedKVReader = new UnorderedKVReader<Text, Text>(manager,
-        defaultConf, null, false, -1, -1, inputRecords);
+        defaultConf, null, false, -1, -1, inputRecords, mock(InputContext.class));
   }
 
   private void createIFile(Path path, int recordCount) throws IOException {
@@ -177,7 +178,7 @@ public class TestUnorderedKVReader {
     TezCounter inputRecords = counters.findCounter(TaskCounter.INPUT_RECORDS_PROCESSED);
     UnorderedKVReader<Text, Text> reader =
         new UnorderedKVReader<Text, Text>(shuffleManager, defaultConf, null, false, -1, -1,
-            inputRecords);
+            inputRecords, mock(InputContext.class));
 
     try {
       reader.next();

http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
index 214ec45..b8f99de 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -328,6 +329,8 @@ public class TestMergeManager {
     assertEquals(m1Prefix, m2Prefix);
     assertNotEquals(m1Prefix, m3Prefix);
     assertNotEquals(m2Prefix, m3Prefix);
+    
+    verify(inputContext, atLeastOnce()).notifyProgress();
 
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
index 3fe540c..1a6c3be 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
@@ -152,6 +152,8 @@ public class TestShuffleScheduler {
         scheduler.copySucceeded(identifiers[i], mapHosts[i], 20, 25, 100, mapOutput, false);
         scheduler.freeHost(mapHosts[i]);
       }
+      
+      verify(inputContext, atLeast(numInputs)).notifyProgress();
 
       // Ensure the executor exits, and without an error.
       executorFuture.get();

http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
index 2cebea4..70819e5 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.tez.runtime.library.common.sort.impl;
 
 import com.google.common.collect.Maps;
@@ -39,30 +57,13 @@ import java.util.TreeMap;
 import java.util.UUID;
 
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 import static org.mockito.Matchers.anyListOf;
+import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.internal.verification.VerificationModeFactory.times;
 
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 public class TestPipelinedSorter {
   private static FileSystem localFs = null;
   private static Path workDir = null;
@@ -417,6 +418,7 @@ public class TestPipelinedSorter {
     //Verify dataset
     verifyData(reader);
     reader.close();
+    verify(outputContext, atLeastOnce()).notifyProgress();
   }
 
   private void verifyCounters(PipelinedSorter sorter, OutputContext context) {

http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
index b531464..e0374a3 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -429,6 +430,7 @@ public class TestDefaultSorter {
     TezCounter outputBytesWithOverheadCounter = context.getCounters().findCounter
         (TaskCounter.OUTPUT_BYTES_WITH_OVERHEAD);
     assertTrue(outputBytesWithOverheadCounter.getValue() > 0);
+    verify(context, atLeastOnce()).notifyProgress();
   }
 
   private void writeData(ExternalSorter sorter, int numKeys, int keyLen) throws IOException {

http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
index 1a10eb8..e7a2125 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
@@ -25,7 +25,6 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyListOf;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.atLeast;
-import static org.mockito.Mockito.atMost;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -585,6 +584,8 @@ public class TestUnorderedPartitionedKVWriter {
       assertTrue(eventProto.hasPathComponent());
     }
 
+    verify(outputContext, atLeast(1)).notifyProgress();
+
     // Verify if all spill files are available.
     TezTaskOutput taskOutput = new TezTaskOutputFiles(conf, uniqueId);
 
@@ -784,6 +785,7 @@ public class TestUnorderedPartitionedKVWriter {
       expectedValues.remove(i);
     }
     assertEquals(0, expectedValues.size());
+    verify(outputContext, atLeast(1)).notifyProgress();
   }
 
   private static String createRandomString(int size) {