You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2015/12/14 03:20:54 UTC

tez git commit: TEZ-2496. Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source. (rbalamohan)

Repository: tez
Updated Branches:
  refs/heads/branch-0.7 3e30e9498 -> 62e4b6ea1


TEZ-2496. Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source. (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/62e4b6ea
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/62e4b6ea
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/62e4b6ea

Branch: refs/heads/branch-0.7
Commit: 62e4b6ea1cd4021791ae3b133e8e80c9a82456d6
Parents: 3e30e94
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Mon Dec 14 07:50:22 2015 +0530
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Mon Dec 14 07:50:22 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 pom.xml                                         |   5 +
 tez-runtime-library/findbugs-exclude.xml        |   9 +
 tez-runtime-library/pom.xml                     |   4 +
 .../vertexmanager/ShuffleVertexManager.java     | 178 ++++++++++++++++---
 .../library/api/TezRuntimeConfiguration.java    |   9 +
 .../library/common/shuffle/ShuffleUtils.java    |  37 +++-
 .../common/sort/impl/ExternalSorter.java        |  27 +++
 .../common/sort/impl/PipelinedSorter.java       |  16 +-
 .../common/sort/impl/dflt/DefaultSorter.java    |  23 ++-
 .../output/OrderedPartitionedKVOutput.java      |  12 +-
 .../runtime/library/utils/DATA_RANGE_IN_MB.java |  49 +++++
 .../src/main/proto/ShufflePayloads.proto        |   1 +
 .../vertexmanager/TestShuffleVertexManager.java | 150 +++++++++++++++-
 .../common/shuffle/TestShuffleUtils.java        |   6 +-
 .../common/sort/impl/TestPipelinedSorter.java   |  17 ++
 .../sort/impl/dflt/TestDefaultSorter.java       |  34 ++++
 .../TestOrderedPartitionedKVEdgeConfig.java     |   4 +
 .../library/output/TestOnFileSortedOutput.java  |   8 +
 19 files changed, 546 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 60ea352..023be60 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,7 @@ INCOMPATIBLE CHANGES
   TEZ-2949. Allow duplicate dag names within session for Tez.
 
 ALL CHANGES
+  TEZ-2496. Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source.
   TEZ-2995. Timeline primary filter should only be on callerId and not type.
   TEZ-2943. Change shuffle vertex manager to use per vertex data for auto
   reduce and slow start

http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 169353f..bc8f487 100644
--- a/pom.xml
+++ b/pom.xml
@@ -181,6 +181,11 @@
         <version>3.1.0</version>
       </dependency>
       <dependency>
+        <groupId>org.roaringbitmap</groupId>
+        <artifactId>RoaringBitmap</artifactId>
+        <version>0.4.9</version>
+      </dependency>
+      <dependency>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-api</artifactId>
         <version>1.7.5</version>

http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/tez-runtime-library/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-runtime-library/findbugs-exclude.xml b/tez-runtime-library/findbugs-exclude.xml
index 45c194c..6b9e851 100644
--- a/tez-runtime-library/findbugs-exclude.xml
+++ b/tez-runtime-library/findbugs-exclude.xml
@@ -99,4 +99,13 @@
   </Match>
 
 
+  <Match>
+    <Bug pattern="EI_EXPOSE_REP"/>
+    <Or>
+      <Class name="org.apache.tez.runtime.library.common.sort.impl.ExteralSorter" />
+      <Method name="getPartitionStats"/>
+      <Field name="partitionStats"/>
+    </Or>
+  </Match>
+
 </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/tez-runtime-library/pom.xml
----------------------------------------------------------------------
diff --git a/tez-runtime-library/pom.xml b/tez-runtime-library/pom.xml
index 9230576..a68ee88 100644
--- a/tez-runtime-library/pom.xml
+++ b/tez-runtime-library/pom.xml
@@ -26,6 +26,10 @@
 
   <dependencies>
     <dependency>
+      <groupId>org.roaringbitmap</groupId>
+      <artifactId>RoaringBitmap</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.apache.tez</groupId>
       <artifactId>tez-api</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index 01dc5a0..1950df2 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -28,6 +28,9 @@ import com.google.common.collect.Sets;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.InvalidProtocolBufferException;
 
+import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.runtime.library.utils.DATA_RANGE_IN_MB;
+import org.roaringbitmap.RoaringBitmap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
@@ -59,12 +62,17 @@ import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexMan
 
 import javax.annotation.Nullable;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -136,7 +144,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
   int totalNumBipartiteSourceTasks = 0;
   int numBipartiteSourceTasksCompleted = 0;
   int numVertexManagerEventsReceived = 0;
-  List<Integer> pendingTasks = Lists.newLinkedList();
+  List<PendingTaskInfo> pendingTasks = Lists.newLinkedList();
   int totalTasksToSchedule = 0;
   private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false);
   
@@ -150,6 +158,12 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
   long completedSourceTasksOutputSize = 0;
   List<VertexStateUpdate> pendingStateUpdates = Lists.newArrayList();
 
+  private int[][] targetIndexes;
+  private int basePartitionRange;
+  private int remainderRangeForLastShuffler;
+  @VisibleForTesting
+  long[] stats; //approximate amount of data to be fetched
+
   static class SourceVertexInfo {
     EdgeProperty edgeProperty;
     boolean vertexIsConfigured;
@@ -172,10 +186,32 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
     }
   }
 
+  static class PendingTaskInfo {
+    private int index;
+    private long outputStats;
+
+    public PendingTaskInfo(int index) {
+      this.index = index;
+    }
+
+    public String toString() {
+      return "[index=" + index + ", outputStats=" + outputStats + "]";
+    }
+  }
+
   public ShuffleVertexManager(VertexManagerPluginContext context) {
     super(context);
   }
 
+  static int[] createIndices(int partitionRange, int taskIndex, int offSetPerTask) {
+    int startIndex = taskIndex * offSetPerTask;
+    int[] indices = new int[partitionRange];
+    for (int currentIndex = 0; currentIndex < partitionRange; ++currentIndex) {
+      indices[currentIndex] = (startIndex + currentIndex);
+    }
+    return indices;
+  }
+
   public static class CustomShuffleEdgeManager extends EdgeManagerPluginOnDemand {
     int numSourceTaskOutputs;
     int numDestinationTasks;
@@ -242,7 +278,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
       } else {
         partitionRange = remainderRangeForLastShuffler;
       }
-      
+
       // all inputs from a source task are next to each other in original order
       int targetIndex = 
           sourceTaskIndex * partitionRange 
@@ -273,14 +309,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
       return EventRouteMetadata.create(1, new int[]{targetIndex});
     }
     
-    private int[] createIndices(int partitionRange, int taskIndex, int offSetPerTask) {
-      int startIndex = taskIndex * offSetPerTask;
-      int[] indices = new int[partitionRange];
-      for (int currentIndex = 0; currentIndex < partitionRange; ++currentIndex) {
-        indices[currentIndex] = (startIndex + currentIndex);
-      }
-      return indices;      
-    }
+
     
     @Override
     public void prepareForRouting() throws Exception {
@@ -494,6 +523,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
     schedulePendingTasks();
   }
 
+
   @Override
   public synchronized void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
     String srcVertexName = attempt.getTaskIdentifier().getVertexIdentifier().getName();
@@ -516,7 +546,25 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
     }
     schedulePendingTasks();
   }
-  
+
+  @VisibleForTesting
+  void parsePartitionStats(RoaringBitmap partitionStats) {
+    Preconditions.checkState(stats != null, "Stats should be initialized");
+    Iterator<Integer> it = partitionStats.iterator();
+    final DATA_RANGE_IN_MB[] RANGES = DATA_RANGE_IN_MB.values();
+    final int RANGE_LEN = RANGES.length;
+    while (it.hasNext()) {
+      int pos = it.next();
+      int index = ((pos) / RANGE_LEN);
+      int rangeIndex = ((pos) % RANGE_LEN);
+      //Add to aggregated stats and normalize to DATA_RANGE_IN_MB.
+      if (RANGES[rangeIndex].getSizeInMB() > 0) {
+        stats[index] += RANGES[rangeIndex].getSizeInMB();
+      }
+    }
+  }
+
+
   @Override
   public synchronized void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
     // currently events from multiple attempts of the same task can be ignored because
@@ -544,6 +592,19 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
       }
       sourceTaskOutputSize = proto.getOutputSize();
 
+      if (proto.hasPartitionStats()) {
+        try {
+          RoaringBitmap partitionStats = new RoaringBitmap();
+          ByteString compressedPartitionStats = proto.getPartitionStats();
+          byte[] rawData = TezCommonUtils.decompressByteStringToByteArray(compressedPartitionStats);
+          ByteArrayInputStream bin = new ByteArrayInputStream(rawData);
+          partitionStats.deserialize(new DataInputStream(bin));
+
+          parsePartitionStats(partitionStats);
+        } catch (IOException e) {
+          throw new TezUncheckedException(e);
+        }
+      }
       srcInfo.numVMEventsReceived++;
       srcInfo.outputSize += sourceTaskOutputSize;
       completedSourceTasksOutputSize += sourceTaskOutputSize;
@@ -558,13 +619,17 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
           + " total output size: " + completedSourceTasksOutputSize);
     }
   }
-  
+
+
   void updatePendingTasks() {
     pendingTasks.clear();
-    for (int i=0; i<getContext().getVertexNumTasks(getContext().getVertexName()); ++i) {
-      pendingTasks.add(i);
+    for (int i = 0; i < getContext().getVertexNumTasks(getContext().getVertexName()); ++i) {
+      pendingTasks.add(new PendingTaskInfo(i));
     }
     totalTasksToSchedule = pendingTasks.size();
+    if (stats == null) {
+      stats = new long[totalTasksToSchedule]; // TODO lost previous data
+    }
   }
 
   Iterable<Map.Entry<String, SourceVertexInfo>> getBipartiteInfo() {
@@ -633,7 +698,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
     }
     
     // most shufflers will be assigned this range
-    int basePartitionRange = currentParallelism/desiredTaskParallelism;
+    basePartitionRange = currentParallelism/desiredTaskParallelism;
     
     if (basePartitionRange <= 1) {
       // nothing to do if range is equal 1 partition. shuffler does it by default
@@ -641,7 +706,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
     }
     
     int numShufflersWithBaseRange = currentParallelism / basePartitionRange;
-    int remainderRangeForLastShuffler = currentParallelism % basePartitionRange; 
+    remainderRangeForLastShuffler = currentParallelism % basePartitionRange;
     
     int finalTaskParallelism = (remainderRangeForLastShuffler > 0) ?
           (numShufflersWithBaseRange + 1) : (numShufflersWithBaseRange);
@@ -678,14 +743,30 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
             oldEdgeProp.getEdgeSource(), oldEdgeProp.getEdgeDestination());
         edgeProperties.put(vertex, newEdgeProp);
       }
-      
+
       getContext().reconfigureVertex(finalTaskParallelism, null, edgeProperties);
-      
       updatePendingTasks();
+      configureTargetMapping(finalTaskParallelism);
     }
     return true;
   }
 
+  void configureTargetMapping(int tasks) {
+    targetIndexes = new int[tasks][];
+    for (int idx = 0; idx < tasks; ++idx) {
+      int partitionRange = basePartitionRange;
+      if (idx == (tasks - 1)) {
+        partitionRange = ((remainderRangeForLastShuffler > 0)
+            ? remainderRangeForLastShuffler : basePartitionRange);
+      }
+      // skip the basePartitionRange per destination task
+      targetIndexes[idx] = createIndices(partitionRange, idx, basePartitionRange);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("targetIdx[" + idx + "] to " + Arrays.toString(targetIndexes[idx]));
+      }
+    }
+  }
+
   void schedulePendingTasks(int numTasksToSchedule, float minSourceVertexCompletedTaskFraction) {
     // determine parallelism before scheduling the first time
     // this is the latest we can wait before determining parallelism.
@@ -702,11 +783,15 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
       }
       getContext().doneReconfiguringVertex();
     }
+    if (totalNumBipartiteSourceTasks > 0) {
+      //Sort in case partition stats are available
+      sortPendingTasksBasedOnDataSize();
+    }
     List<ScheduleTaskRequest> scheduledTasks = Lists.newArrayListWithCapacity(numTasksToSchedule);
 
     while(!pendingTasks.isEmpty() && numTasksToSchedule > 0) {
       numTasksToSchedule--;
-      Integer taskIndex = pendingTasks.get(0);
+      Integer taskIndex = pendingTasks.get(0).index;
       scheduledTasks.add(ScheduleTaskRequest.create(taskIndex, null));
       pendingTasks.remove(0);
     }
@@ -718,6 +803,59 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
     }
   }
 
+  private void sortPendingTasksBasedOnDataSize() {
+    //Get partition sizes from all source vertices
+    boolean statsUpdated = computePartitionSizes();
+
+    if (statsUpdated) {
+      //Order the pending tasks based on task size in reverse order
+      Collections.sort(pendingTasks, new Comparator<PendingTaskInfo>() {
+        @Override
+        public int compare(PendingTaskInfo left, PendingTaskInfo right) {
+          return (left.outputStats > right.outputStats) ? -1 :
+              ((left.outputStats == right.outputStats) ? 0 : 1);
+        }
+      });
+
+      if (LOG.isDebugEnabled()) {
+        for (PendingTaskInfo pendingTask : pendingTasks) {
+          LOG.debug("Pending task:" + pendingTask.toString());
+        }
+      }
+    }
+  }
+
+  /**
+   * Compute partition sizes in case statistics are available in vertex.
+   *
+   * @return boolean indicating whether stats are computed
+   */
+  private synchronized boolean computePartitionSizes() {
+    boolean computedPartitionSizes = false;
+    for (PendingTaskInfo taskInfo : pendingTasks) {
+      int index = taskInfo.index;
+      if (targetIndexes != null) { //parallelism has changed.
+        Preconditions.checkState(index < targetIndexes.length,
+            "index=" + index +", targetIndexes length=" + targetIndexes.length);
+        int[] mapping = targetIndexes[index];
+        long totalStats = 0;
+        for (int i : mapping) {
+          totalStats += stats[i];
+        }
+        if ((totalStats > 0) && (taskInfo.outputStats != totalStats)) {
+          computedPartitionSizes = true;
+          taskInfo.outputStats = totalStats;
+        }
+      } else {
+        if ((stats[index] > 0) && (stats[index] != taskInfo.outputStats)) {
+          computedPartitionSizes = true;
+          taskInfo.outputStats = stats[index];
+        }
+      }
+    }
+    return computedPartitionSizes;
+  }
+
   /**
    * Verify whether each of the source vertices have completed at least 1 task
    *
@@ -865,7 +1003,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
         + slowStartMaxSrcCompletionFraction + " auto:" + enableAutoParallelism
         + " desiredTaskIput:" + desiredTaskInputDataSize + " minTasks:"
         + minTaskParallelism);
-    
+
     if (enableAutoParallelism) {
       getContext().vertexReconfigurationPlanned();
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
index 3cfbf8e..440c9f4 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
@@ -134,6 +134,14 @@ public class TezRuntimeConfiguration {
   public static final int TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS_DEFAULT = 2;
 
   /**
+   * Report partition statistics (e.g better scheduling in ShuffleVertexManager). TEZ-2496
+   * This can be enabled/disabled at vertex level.
+   */
+  public static final String TEZ_RUNTIME_REPORT_PARTITION_STATS = TEZ_RUNTIME_PREFIX +
+      "report.partition.stats";
+  public static final boolean TEZ_RUNTIME_REPORT_PARTITION_STATS_DEFAULT = true;
+
+  /**
    * Size of the buffer to use if not writing directly to disk.
    */
   public static final String TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB = TEZ_RUNTIME_PREFIX +
@@ -457,6 +465,7 @@ public class TezRuntimeConfiguration {
     tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION);
     tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION);
     tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION);
+    tezRuntimeKeys.add(TEZ_RUNTIME_REPORT_PARTITION_STATS);
     tezRuntimeKeys.add(TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT);
     tezRuntimeKeys.add(TEZ_RUNTIME_GROUP_COMPARATOR_CLASS);
     tezRuntimeKeys.add(TEZ_RUNTIME_INTERNAL_SORTER_CLASS);

http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index 1873485..8aca3af 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -29,11 +29,15 @@ import java.text.DecimalFormat;
 import java.util.BitSet;
 import java.util.List;
 
+import javax.annotation.Nullable;
 import javax.crypto.SecretKey;
 
 import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
+import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.library.utils.DATA_RANGE_IN_MB;
+import org.roaringbitmap.RoaringBitmap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -406,12 +410,13 @@ public class ShuffleUtils {
    * @param spillRecord
    * @param numPhysicalOutputs
    * @param pathComponent
+   * @param partitionStats
    * @throws IOException
    */
   public static void generateEventOnSpill(List<Event> eventList, boolean finalMergeEnabled,
       boolean isLastEvent, OutputContext context, int spillId, TezSpillRecord spillRecord,
-      int numPhysicalOutputs, boolean sendEmptyPartitionDetails, String pathComponent)
-      throws IOException {
+      int numPhysicalOutputs, boolean sendEmptyPartitionDetails, String pathComponent,
+      @Nullable long[] partitionStats) throws IOException {
     Preconditions.checkArgument(eventList != null, "EventList can't be null");
 
     if (finalMergeEnabled) {
@@ -439,6 +444,16 @@ public class ShuffleUtils {
       // up adding up to final outputsize.  This is needed for auto-reduce parallelism to work
       // properly.
       vmBuilder.setOutputSize(outputSize);
+
+      //set partition stats
+      if (partitionStats != null && partitionStats.length > 0) {
+        RoaringBitmap stats = getPartitionStatsForPhysicalOutput(partitionStats);
+        DataOutputBuffer dout = new DataOutputBuffer();
+        stats.serialize(dout);
+        ByteString partitionStatsBytes = TezCommonUtils.compressByteArrayToByteString(dout.getData());
+        vmBuilder.setPartitionStats(partitionStatsBytes);
+      }
+
       VertexManagerEvent vmEvent = VertexManagerEvent.create(
           context.getDestinationVertexName(), vmBuilder.build().toByteString().asReadOnlyByteBuffer());
       eventList.add(vmEvent);
@@ -450,6 +465,24 @@ public class ShuffleUtils {
     eventList.add(csdme);
   }
 
+  /**
+   * Data size for the destinations
+   *
+   * @param sizes for physical outputs
+   */
+  public static RoaringBitmap getPartitionStatsForPhysicalOutput(long[] sizes) {
+    RoaringBitmap partitionStats = new RoaringBitmap();
+    if (sizes == null || sizes.length == 0) {
+      return partitionStats;
+    }
+    final int RANGE_LEN = DATA_RANGE_IN_MB.values().length;
+    for (int i = 0; i < sizes.length; i++) {
+      int bucket = DATA_RANGE_IN_MB.getRange(sizes[i]).ordinal();
+      int index = i * (RANGE_LEN);
+      partitionStats.add(index + bucket);
+    }
+    return partitionStats;
+  }
 
 
   /**

http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
index aba04e0..ac5acb8 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
@@ -26,6 +26,7 @@ import java.util.Map;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
+import org.apache.tez.runtime.api.OutputStatisticsReporter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -69,6 +70,7 @@ public abstract class ExternalSorter {
   public void close() throws IOException {
     spillFileIndexPaths.clear();
     spillFilePaths.clear();
+    reportStatistics();
   }
 
   public abstract void flush() throws IOException;
@@ -116,6 +118,8 @@ public abstract class ExternalSorter {
   protected Path finalIndexFile;
   protected int numSpills;
 
+  protected OutputStatisticsReporter statsReporter;
+  protected final long[] partitionStats;
   protected final boolean finalMergeEnabled;
   protected final boolean sendEmptyPartitionDetails;
 
@@ -152,6 +156,10 @@ public abstract class ExternalSorter {
     this.outputContext = outputContext;
     this.conf = conf;
     this.partitions = numOutputs;
+    boolean reportPartitionStats = conf.getBoolean(TezRuntimeConfiguration
+            .TEZ_RUNTIME_REPORT_PARTITION_STATS,
+        TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS_DEFAULT);
+    this.partitionStats = (reportPartitionStats) ? (new long[partitions]) : null;
 
     rfs = ((LocalFileSystem)FileSystem.getLocal(this.conf)).getRaw();
 
@@ -241,6 +249,8 @@ public abstract class ExternalSorter {
     this.conf.setInt(TezRuntimeFrameworkConfigs.TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS, this.partitions);
     this.partitioner = TezRuntimeUtils.instantiatePartitioner(this.conf);
     this.combiner = TezRuntimeUtils.instantiateCombiner(this.conf, outputContext);
+
+    this.statsReporter = outputContext.getStatisticsReporter();
     this.finalMergeEnabled = conf.getBoolean(
         TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT,
         TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT_DEFAULT);
@@ -339,4 +349,21 @@ public abstract class ExternalSorter {
   public int getNumSpills() {
     return numSpills;
   }
+
+  public long[] getPartitionStats() {
+    return partitionStats;
+  }
+
+  protected boolean reportPartitionStats() {
+    return (partitionStats != null);
+  }
+
+  protected synchronized void reportStatistics() {
+    // This works for non-started outputs since new counters will be created with an initial value of 0
+    long outputSize = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES).getValue();
+    statsReporter.reportDataSize(outputSize);
+    long outputRecords = outputContext.getCounters()
+        .findCounter(TaskCounter.OUTPUT_RECORDS).getValue();
+    statsReporter.reportItemsProcessed(outputRecords);
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index 049087b..9708d7c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -271,7 +271,7 @@ public class PipelinedSorter extends ExternalSorter {
     String pathComponent = (outputContext.getUniqueIdentifier() + "_" + (numSpills-1));
     ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), false, outputContext,
         (numSpills - 1), indexCacheList.get(numSpills - 1), partitions, sendEmptyPartitionDetails,
-        pathComponent);
+        pathComponent, partitionStats);
     outputContext.sendEvents(events);
     LOG.info(outputContext.getDestinationVertexName() +
         ": Added spill event for spill (final update=false), spillId=" + (numSpills - 1));
@@ -476,6 +476,9 @@ public class PipelinedSorter extends ExternalSorter {
                 writer.getRawLength(),
                 writer.getCompressedLength());
         spillRec.putIndex(rec, i);
+        if (!isFinalMergeEnabled() && reportPartitionStats()) {
+          partitionStats[i] += writer.getCompressedLength();
+        }
       }
 
       Path indexFilename =
@@ -538,7 +541,7 @@ public class PipelinedSorter extends ExternalSorter {
         String pathComponent = (outputContext.getUniqueIdentifier() + "_" + i);
         ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), isLastEvent,
             outputContext, i, indexCacheList.get(i), partitions,
-            sendEmptyPartitionDetails, pathComponent);
+            sendEmptyPartitionDetails, pathComponent, partitionStats);
         LOG.info(outputContext.getDestinationVertexName() + ": Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + i);
       }
       outputContext.sendEvents(events);
@@ -564,6 +567,12 @@ public class PipelinedSorter extends ExternalSorter {
             + "finalIndexFile=" + finalIndexFile + ", filename=" + filename + ", indexFilename=" +
             indexFilename);
       }
+      TezSpillRecord spillRecord = new TezSpillRecord(finalIndexFile, conf);
+      if (reportPartitionStats()) {
+        for (int i = 0; i < spillRecord.size(); i++) {
+          partitionStats[i] += spillRecord.getIndex(i).getPartLength();
+        }
+      }
       numShuffleChunks.setValue(numSpills);
       fileOutputByteCounter.increment(rfs.getFileStatus(finalOutputFile).getLen());
       return;
@@ -638,6 +647,9 @@ public class PipelinedSorter extends ExternalSorter {
               writer.getRawLength(), 
               writer.getCompressedLength());
       spillRec.putIndex(rec, parts);
+      if (reportPartitionStats()) {
+        partitionStats[parts] += writer.getCompressedLength();
+      }
     }
 
     numShuffleChunks.setValue(1); //final merge has happened.

http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index ac90112..6c15a5d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -699,8 +699,6 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
     }
   }
 
-  @Override
-  public void close() throws IOException { }
 
   protected class SpillThread extends Thread {
 
@@ -883,6 +881,9 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
                   writer.getRawLength(),
                   writer.getCompressedLength());
           spillRec.putIndex(rec, i);
+          if (!isFinalMergeEnabled() && reportPartitionStats()) {
+            partitionStats[i] += writer.getCompressedLength();
+          }
 
           writer = null;
         } finally {
@@ -1079,7 +1080,8 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
 
     String pathComponent = (outputContext.getUniqueIdentifier() + "_" + index);
     ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), isLastEvent,
-        outputContext, index, spillRecord, partitions, sendEmptyPartitionDetails, pathComponent);
+        outputContext, index, spillRecord, partitions, sendEmptyPartitionDetails, pathComponent,
+        partitionStats);
 
     LOG.info(outputContext.getDestinationVertexName() + ": " +
         "Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + index);
@@ -1127,19 +1129,22 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
       finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
     }
     if (numSpills == 1) { //the spill is the final output
+      TezSpillRecord spillRecord = null;
       if (isFinalMergeEnabled()) {
         finalOutputFile = mapOutputFile.getOutputFileForWriteInVolume(filename[0]);
         finalIndexFile = mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]);
         sameVolRename(filename[0], finalOutputFile);
         if (indexCacheList.size() == 0) {
           sameVolRename(spillFileIndexPaths.get(0), finalIndexFile);
+          spillRecord = new TezSpillRecord(finalIndexFile, conf);
         } else {
-          indexCacheList.get(0).writeToFile(finalIndexFile, conf);
+          spillRecord = indexCacheList.get(0);
+          spillRecord.writeToFile(finalIndexFile, conf);
         }
       } else {
         List<Event> events = Lists.newLinkedList();
         //Since there is only one spill, spill record would be present in cache.
-        TezSpillRecord spillRecord = indexCacheList.get(0);
+        spillRecord = indexCacheList.get(0);
         Path indexPath = mapOutputFile.getSpillIndexFileForWrite(numSpills-1, partitions *
             MAP_OUTPUT_INDEX_RECORD_LENGTH);
         spillRecord.writeToFile(indexPath, conf);
@@ -1147,6 +1152,11 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
         fileOutputByteCounter.increment(rfs.getFileStatus(spillFilePaths.get(0)).getLen());
         //No need to populate finalIndexFile, finalOutputFile etc when finalMerge is disabled
       }
+      if (spillRecord != null && reportPartitionStats()) {
+        for(int i=0; i < spillRecord.size(); i++) {
+          partitionStats[i] += spillRecord.getIndex(i).getPartLength();
+        }
+      }
       numShuffleChunks.setValue(numSpills);
       return;
     }
@@ -1276,6 +1286,9 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
                 writer.getRawLength(),
                 writer.getCompressedLength());
         spillRec.putIndex(rec, parts);
+        if (reportPartitionStats()) {
+          partitionStats[parts] += writer.getCompressedLength();
+        }
       }
       numShuffleChunks.setValue(1); //final merge has happened
       spillRec.writeToFile(finalIndexFile, conf);

http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
index 2b4c0f4..e418c1b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
@@ -37,7 +37,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.TezUtils;
-import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.runtime.api.AbstractLogicalOutput;
 import org.apache.tez.runtime.api.Event;
@@ -191,13 +190,6 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput {
       returnEvents = generateEmptyEvents();
     }
 
-    // This works for non-started outputs since new counters will be created with an initial value of 0
-    long outputSize = getContext().getCounters().findCounter(TaskCounter.OUTPUT_BYTES).getValue();
-    getContext().getStatisticsReporter().reportDataSize(outputSize);
-    long outputRecords = getContext().getCounters()
-        .findCounter(TaskCounter.OUTPUT_RECORDS).getValue();
-    getContext().getStatisticsReporter().reportItemsProcessed(outputRecords);
-
     return returnEvents;
   }
 
@@ -207,7 +199,8 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput {
       boolean isLastEvent = true;
       ShuffleUtils.generateEventOnSpill(eventList, finalMergeEnabled, isLastEvent,
           getContext(), 0, new TezSpillRecord(sorter.getFinalIndexFile(), conf),
-          getNumPhysicalOutputs(), sendEmptyPartitionDetails, getContext().getUniqueIdentifier());
+          getNumPhysicalOutputs(), sendEmptyPartitionDetails, getContext().getUniqueIdentifier(),
+          sorter.getPartitionStats());
     }
     return eventList;
   }
@@ -228,6 +221,7 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput {
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_PERCENT);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINE_MIN_SPILLS);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS);

http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/DATA_RANGE_IN_MB.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/DATA_RANGE_IN_MB.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/DATA_RANGE_IN_MB.java
new file mode 100644
index 0000000..126f04e
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/DATA_RANGE_IN_MB.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.utils;
+
+import org.apache.commons.math3.util.FastMath;
+
+public enum DATA_RANGE_IN_MB {
+  THOUSAND(1000), HUNDRED(100), TEN(10), ONE(1), ZERO(0);
+
+  private final int sizeInMB;
+
+  private DATA_RANGE_IN_MB(int sizeInMB) {
+    this.sizeInMB = sizeInMB;
+  }
+
+  public final int getSizeInMB() {
+    return sizeInMB;
+  }
+
+  static long ceil(long a, long b) {
+    return (a + (b - 1)) / b;
+  }
+
+  public static final DATA_RANGE_IN_MB getRange(long sizeInBytes) {
+    long sizeInMB = ceil(sizeInBytes, (1024l * 1024l));
+    for (DATA_RANGE_IN_MB range : values()) {
+      if (sizeInMB >= range.sizeInMB) {
+        return range;
+      }
+    }
+    return ZERO;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/tez-runtime-library/src/main/proto/ShufflePayloads.proto
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/proto/ShufflePayloads.proto b/tez-runtime-library/src/main/proto/ShufflePayloads.proto
index f7b482d..9b0fc16 100644
--- a/tez-runtime-library/src/main/proto/ShufflePayloads.proto
+++ b/tez-runtime-library/src/main/proto/ShufflePayloads.proto
@@ -43,6 +43,7 @@ message InputInformationEventPayloadProto {
 
 message VertexManagerEventPayloadProto {
   optional int64 output_size = 1;
+  optional bytes partition_stats = 2;
 }
 
 message ShuffleEdgeManagerConfigPayloadProto {

http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index 9a9ff27..9d53ebc 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -18,10 +18,14 @@
 
 package org.apache.tez.dag.library.vertexmanager;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
+import com.google.protobuf.ByteString;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.EdgeManagerPlugin;
 import org.apache.tez.dag.api.EdgeManagerPluginContext;
@@ -48,11 +52,13 @@ import org.apache.tez.runtime.api.VertexIdentifier;
 import org.apache.tez.runtime.api.VertexStatistics;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+import org.roaringbitmap.RoaringBitmap;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -301,6 +307,43 @@ public class TestShuffleVertexManager {
     Assert.assertEquals(5000L, manager.completedSourceTasksOutputSize);
 
     /**
+     * Test partition stats
+     */
+    scheduledTasks.clear();
+    //{5,9,12,18} in bitmap
+    long[] sizes = new long[]{(0l), (1000l * 1000l),
+                              (1010 * 1000l * 1000l), (50 * 1000l * 1000l)};
+    vmEvent = getVertexManagerEvent(sizes, 1L, "Vertex");
+
+    manager = createManager(conf, mockContext, 0.01f, 0.75f);
+    manager.onVertexStarted(emptyCompletions);
+    Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled
+    Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
+
+    TezTaskAttemptID taId1 = TezTaskAttemptID.fromString("attempt_1436907267600_195589_1_00_000000_0");
+    vmEvent.setProducerAttemptIdentifier(new TaskAttemptIdentifierImpl("dag", mockSrcVertexId1, taId1));
+    manager.onVertexManagerEventReceived(vmEvent);
+    Assert.assertEquals(1, manager.numVertexManagerEventsReceived);
+
+    Assert.assertEquals(4, manager.stats.length);
+    Assert.assertEquals(0, manager.stats[0]); //0 MB bucket
+    Assert.assertEquals(1, manager.stats[1]); //1 MB bucket
+    Assert.assertEquals(100, manager.stats[2]); //100 MB bucket
+    Assert.assertEquals(10, manager.stats[3]); //10 MB bucket
+
+    // sending again from a different version of the same task has not impact
+    TezTaskAttemptID taId2 = TezTaskAttemptID.fromString("attempt_1436907267600_195589_1_00_000000_1");
+    vmEvent.setProducerAttemptIdentifier(new TaskAttemptIdentifierImpl("dag", mockSrcVertexId1, taId2));
+    manager.onVertexManagerEventReceived(vmEvent);
+    Assert.assertEquals(1, manager.numVertexManagerEventsReceived);
+
+    Assert.assertEquals(4, manager.stats.length);
+    Assert.assertEquals(0, manager.stats[0]); //0 MB bucket
+    Assert.assertEquals(1, manager.stats[1]); //1 MB bucket
+    Assert.assertEquals(100, manager.stats[2]); //100 MB bucket
+    Assert.assertEquals(10, manager.stats[3]); //10 MB bucket
+
+    /**
      * Test for TEZ-978
      * Delay determining parallelism until enough data has been received.
      */
@@ -511,6 +554,7 @@ public class TestShuffleVertexManager {
     String mockManagedVertexId = "Vertex4";
     
     VertexManagerPluginContext mockContext = mock(VertexManagerPluginContext.class);
+    when(mockContext.getVertexStatistics(any(String.class))).thenReturn(mock(VertexStatistics.class));
     when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
     when(mockContext.getVertexName()).thenReturn(mockManagedVertexId);
     when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3);
@@ -631,7 +675,7 @@ public class TestShuffleVertexManager {
     when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(2);
 
     // source vertex have some tasks. min, max == 0
-    manager = createManager(conf, mockContext, 0.f, 0.f);
+    manager = createManager(conf, mockContext, 0.0f, 0.0f);
     manager.onVertexStarted(emptyCompletions);
     Assert.assertTrue(manager.totalTasksToSchedule == 3);
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 0);
@@ -926,7 +970,6 @@ public class TestShuffleVertexManager {
       throws IOException {
     ByteBuffer payload = null;
     if (sizes != null) {
-      /*
       RoaringBitmap partitionStats = ShuffleUtils.getPartitionStatsForPhysicalOutput(sizes);
       DataOutputBuffer dout = new DataOutputBuffer();
       partitionStats.serialize(dout);
@@ -938,7 +981,6 @@ public class TestShuffleVertexManager {
               .setPartitionStats(partitionStatsBytes)
               .build().toByteString()
               .asReadOnlyByteBuffer();
-              */
     } else {
       payload =
           VertexManagerEventPayloadProto.newBuilder()
@@ -954,6 +996,108 @@ public class TestShuffleVertexManager {
   }
 
   @Test(timeout = 5000)
+  public void testSchedulingWithPartitionStats() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setBoolean(
+        ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL,
+        true);
+    conf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, 1000L);
+    ShuffleVertexManager manager = null;
+
+    HashMap<String, EdgeProperty> mockInputVertices = new HashMap<String, EdgeProperty>();
+    String r1 = "R1";
+    EdgeProperty eProp1 = EdgeProperty.create(
+        EdgeProperty.DataMovementType.SCATTER_GATHER,
+        EdgeProperty.DataSourceType.PERSISTED,
+        SchedulingType.SEQUENTIAL,
+        OutputDescriptor.create("out"),
+        InputDescriptor.create("in"));
+    String m2 = "M2";
+    EdgeProperty eProp2 = EdgeProperty.create(
+        EdgeProperty.DataMovementType.BROADCAST,
+        EdgeProperty.DataSourceType.PERSISTED,
+        SchedulingType.SEQUENTIAL,
+        OutputDescriptor.create("out"),
+        InputDescriptor.create("in"));
+    String m3 = "M3";
+    EdgeProperty eProp3 = EdgeProperty.create(
+        EdgeProperty.DataMovementType.BROADCAST,
+        EdgeProperty.DataSourceType.PERSISTED,
+        SchedulingType.SEQUENTIAL,
+        OutputDescriptor.create("out"),
+        InputDescriptor.create("in"));
+
+    final String mockManagedVertexId = "R2";
+
+    mockInputVertices.put(r1, eProp1);
+    mockInputVertices.put(m2, eProp2);
+    mockInputVertices.put(m3, eProp3);
+
+    VertexManagerPluginContext mockContext = mock(VertexManagerPluginContext.class);
+    when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
+    when(mockContext.getVertexName()).thenReturn(mockManagedVertexId);
+    when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3);
+    when(mockContext.getVertexNumTasks(r1)).thenReturn(3);
+    when(mockContext.getVertexNumTasks(m2)).thenReturn(3);
+    when(mockContext.getVertexNumTasks(m3)).thenReturn(3);
+
+    final List<Integer> scheduledTasks = Lists.newLinkedList();
+    doAnswer(new Answer() {
+      public Object answer(InvocationOnMock invocation) {
+        Object[] args = invocation.getArguments();
+        scheduledTasks.clear();
+        List<ScheduleTaskRequest> tasks = (List<ScheduleTaskRequest>)args[0];
+        for (ScheduleTaskRequest task : tasks) {
+          scheduledTasks.add(task.getTaskIndex());
+        }
+        return null;
+      }}).when(mockContext).scheduleTasks(anyList());
+
+    // check initialization
+    manager = createManager(conf, mockContext, 0.001f, 0.001f);
+    manager.onVertexStarted(emptyCompletions);
+    Assert.assertTrue(manager.bipartiteSources == 1);
+
+    manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
+    manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED));
+
+    Assert.assertEquals(3, manager.pendingTasks.size()); // no tasks scheduled
+    Assert.assertEquals(3, manager.totalNumBipartiteSourceTasks);
+    Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
+
+    //Send an event for r1.
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 0));
+    Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
+    Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 3);
+
+    //Tasks should be scheduled in task 2, 0, 1 order
+    long[] sizes = new long[]{(100 * 1000l * 1000l), (0l), (5000 * 1000l * 1000l)};
+    VertexManagerEvent vmEvent = getVertexManagerEvent(sizes, 1060000000, r1);
+    manager.onVertexManagerEventReceived(vmEvent); //send VM event
+
+    //stats from another vertex (more of empty stats)
+    sizes = new long[]{(0l), (0l), (0l)};
+    vmEvent = getVertexManagerEvent(sizes, 1060000000, r1);
+    manager.onVertexManagerEventReceived(vmEvent); //send VM event
+
+    //Send an event for m2.
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m2, 0));
+    Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
+    Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 3);
+
+    //Send an event for m3.
+    manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m3, 0));
+    Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled
+    Assert.assertTrue(scheduledTasks.size() == 3);
+
+    //Order of scheduling should be 2,0,1 based on the available partition statistics
+    Assert.assertTrue(scheduledTasks.get(0) == 2);
+    Assert.assertTrue(scheduledTasks.get(1) == 0);
+    Assert.assertTrue(scheduledTasks.get(2) == 1);
+  }
+
+  @Test(timeout = 5000)
   public void test_Tez1649_with_mixed_edges() {
     Configuration conf = new Configuration();
     conf.setBoolean(

http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
index 2e264f6..9f9cd59 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
@@ -148,7 +148,7 @@ public class TestShuffleUtils {
     int physicalOutputs = 10;
     String pathComponent = "/attempt_x_y_0/file.out";
     ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent, outputContext,
-        spillId, new TezSpillRecord(indexFile, conf), physicalOutputs, true, pathComponent);
+        spillId, new TezSpillRecord(indexFile, conf), physicalOutputs, true, pathComponent, null);
 
     Assert.assertTrue(events.size() == 1);
     Assert.assertTrue(events.get(0) instanceof CompositeDataMovementEvent);
@@ -186,7 +186,7 @@ public class TestShuffleUtils {
 
     //normal code path where we do final merge all the time
     ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent, outputContext,
-        spillId, new TezSpillRecord(indexFile, conf), physicalOutputs, true, pathComponent);
+        spillId, new TezSpillRecord(indexFile, conf), physicalOutputs, true, pathComponent, null);
 
     Assert.assertTrue(events.size() == 2); //one for VM
     Assert.assertTrue(events.get(0) instanceof VertexManagerEvent);
@@ -226,7 +226,7 @@ public class TestShuffleUtils {
 
     //normal code path where we do final merge all the time
     ShuffleUtils.generateEventOnSpill(events, finalMergeDisabled, isLastEvent, outputContext,
-        spillId, new TezSpillRecord(indexFile, conf), physicalOutputs, true, pathComponent);
+        spillId, new TezSpillRecord(indexFile, conf), physicalOutputs, true, pathComponent, null);
 
     Assert.assertTrue(events.size() == 2); //one for VM
     Assert.assertTrue(events.get(0) instanceof VertexManagerEvent);

http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
index 4aa53eb..129f4d1 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
@@ -19,6 +19,7 @@ import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.ExecutionContext;
 import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.api.OutputStatisticsReporter;
 import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
@@ -131,6 +132,14 @@ public class TestPipelinedSorter {
   }
 
   @Test
+  public void testWithoutPartitionStats() throws IOException {
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS, false);
+    //# partition, # of keys, size per key, InitialMem, blockSize
+    basicTest(1, 0, 0, (10 * 1024l * 1024l), 3 << 20);
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS, true);
+  }
+
+  @Test
   public void testWithEmptyData() throws IOException {
     conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 5);
     //# partition, # of keys, size per key, InitialMem, blockSize
@@ -328,6 +337,13 @@ public class TestPipelinedSorter {
 
     writeData(sorter, numKeys, keySize);
 
+    //partition stats;
+    boolean partitionStats = conf.getBoolean(TezRuntimeConfiguration
+        .TEZ_RUNTIME_REPORT_PARTITION_STATS, TezRuntimeConfiguration
+        .TEZ_RUNTIME_REPORT_PARTITION_STATS_DEFAULT);
+    if (partitionStats) {
+      assertTrue(sorter.getPartitionStats() != null);
+    }
 
     verifyCounters(sorter, outputContext);
     Path outputFile = sorter.finalOutputFile;
@@ -471,6 +487,7 @@ public class TestPipelinedSorter {
             (ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
 
     doReturn(execContext).when(outputContext).getExecutionContext();
+    doReturn(mock(OutputStatisticsReporter.class)).when(outputContext).getStatisticsReporter();
     doReturn(counters).when(outputContext).getCounters();
     doReturn(appId).when(outputContext).getApplicationId();
     doReturn(1).when(outputContext).getDAGAttemptNumber();

http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
index ecc44a7..4022525 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
@@ -53,6 +53,7 @@ import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.ExecutionContext;
 import org.apache.tez.runtime.api.MemoryUpdateCallback;
 import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.api.OutputStatisticsReporter;
 import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
 import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
@@ -307,6 +308,38 @@ public class TestDefaultSorter {
     }
   }
 
+  void testPartitionStats(boolean withStats) throws IOException {
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS, withStats);
+    OutputContext context = createTezOutputContext();
+
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false);
+    conf.setLong(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 4);
+    MemoryUpdateCallbackHandler handler = new MemoryUpdateCallbackHandler();
+    context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement(conf,
+        context.getTotalMemoryAvailableToTask()), handler);
+    DefaultSorter sorter = new DefaultSorter(context, conf, 1, handler.getMemoryAssigned());
+
+    writeData(sorter, 1000, 10);
+    assertTrue(sorter.getNumSpills() == 1);
+    verifyCounters(sorter, context);
+
+    if (withStats) {
+      assertTrue(sorter.getPartitionStats() != null);
+    } else {
+      assertTrue(sorter.getPartitionStats() == null);
+    }
+  }
+
+  @Test(timeout = 60000)
+  public void testWithPartitionStats() throws IOException {
+    testPartitionStats(true);
+  }
+
+  @Test(timeout = 60000)
+  public void testWithoutPartitionStats() throws IOException {
+    testPartitionStats(false);
+  }
+
   @Test(timeout = 60000)
   @SuppressWarnings("unchecked")
   public void testWithSingleSpillWithFinalMergeDisabled() throws IOException {
@@ -418,6 +451,7 @@ public class TestDefaultSorter {
 
     OutputContext context = mock(OutputContext.class);
     ExecutionContext execContext = new ExecutionContextImpl("localhost");
+    doReturn(mock(OutputStatisticsReporter.class)).when(context).getStatisticsReporter();
     doReturn(execContext).when(context).getExecutionContext();
     doReturn(counters).when(context).getCounters();
     doReturn(workingDirs).when(context).getWorkDirs();

http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java
index c72dd52..f57731c 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java
@@ -127,6 +127,7 @@ public class TestOrderedPartitionedKVEdgeConfig {
     additionalConfs.put(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, "1111");
     additionalConfs.put(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT, "0.22f");
     additionalConfs.put(TezRuntimeConfiguration.TEZ_RUNTIME_INTERNAL_SORTER_CLASS, "CustomSorter");
+    additionalConfs.put(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS, "true");
     additionalConfs.put("file.shouldExist", "file");
 
     OrderedPartitionedKVEdgeConfig.Builder builder = OrderedPartitionedKVEdgeConfig
@@ -163,6 +164,9 @@ public class TestOrderedPartitionedKVEdgeConfig {
     assertEquals("io", outputConf.get("io.shouldExist"));
     assertEquals("file", outputConf.get("file.shouldExist"));
     assertEquals("fs", outputConf.get("fs.shouldExist"));
+    assertEquals(true, outputConf.getBoolean(TezRuntimeConfiguration
+            .TEZ_RUNTIME_REPORT_PARTITION_STATS,
+        TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS_DEFAULT));
 
 
     assertEquals(3, inputConf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR, 0));

http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
index 19eb18a..8942f4b 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
@@ -35,6 +35,7 @@ import org.apache.tez.runtime.api.MemoryUpdateCallback;
 import org.apache.tez.runtime.api.OutputContext;
 import org.apache.tez.runtime.api.OutputStatisticsReporter;
 import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.runtime.library.api.KeyValuesWriter;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
@@ -166,6 +167,7 @@ public class TestOnFileSortedOutput {
 
   private void startSortedOutput(int partitions) throws Exception {
     OutputContext context = createTezOutputContext();
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS, true);
     conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 4);
     UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf);
     doReturn(payLoad).when(context).getUserPayload();
@@ -292,6 +294,12 @@ public class TestOnFileSortedOutput {
         .parseFrom(
             ByteString.copyFrom(((CompositeDataMovementEvent) eventList.get(1)).getUserPayload()));
 
+    ShuffleUserPayloads.VertexManagerEventPayloadProto
+        vmPayload = ShuffleUserPayloads.VertexManagerEventPayloadProto
+        .parseFrom(
+            ByteString.copyFrom(((VertexManagerEvent) eventList.get(0)).getUserPayload()));
+
+    assertTrue(vmPayload.hasPartitionStats());
     assertEquals(HOST, payload.getHost());
     assertEquals(PORT, payload.getPort());
     assertEquals(UniqueID, payload.getPathComponent());