You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2016/06/16 22:54:43 UTC

tez git commit: TEZ-3216. Add support for more precise partition stats in VertexManagerEvent. Contributed by Ming Ma.

Repository: tez
Updated Branches:
  refs/heads/master cc33410d8 -> 80ba12b2a


TEZ-3216. Add support for more precise partition stats in VertexManagerEvent. Contributed by Ming Ma.


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/80ba12b2
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/80ba12b2
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/80ba12b2

Branch: refs/heads/master
Commit: 80ba12b2ad03ccb860aafc53a46c447aaa242d0d
Parents: cc33410
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Jun 16 15:54:01 2016 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Jun 16 15:54:01 2016 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../library/api/TezRuntimeConfiguration.java    | 79 +++++++++++++++++-
 .../library/common/shuffle/ShuffleUtils.java    | 85 +++++++++++++++-----
 .../common/sort/impl/ExternalSorter.java        | 19 +++--
 .../common/sort/impl/PipelinedSorter.java       | 10 ++-
 .../common/sort/impl/dflt/DefaultSorter.java    |  2 +-
 .../writers/UnorderedPartitionedKVWriter.java   | 72 ++++++++---------
 .../output/OrderedPartitionedKVOutput.java      |  2 +-
 .../library/output/UnorderedKVOutput.java       |  1 +
 .../output/UnorderedPartitionedKVOutput.java    |  1 +
 .../src/main/proto/ShufflePayloads.proto        |  7 ++
 .../common/shuffle/TestShuffleUtils.java        | 15 ++--
 .../common/sort/impl/TestPipelinedSorter.java   | 10 ++-
 .../TestUnorderedPartitionedKVWriter.java       | 81 +++++++++++++------
 .../TestOrderedPartitionedKVEdgeConfig.java     | 10 ++-
 .../library/output/TestOnFileSortedOutput.java  | 54 +++++++++----
 16 files changed, 321 insertions(+), 128 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/80ba12b2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1e1803d..5f90539 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3216. Add support for more precise partition stats in VertexManagerEvent.
   TEZ-3296. Tez job can hang if two vertices at the same root distance have different task requirements
   TEZ-3294. DAG.createDag() does not clear local state on repeat calls.
   TEZ-3297. Deadlock scenario in AM during ShuffleVertexManager auto reduce.

http://git-wip-us.apache.org/repos/asf/tez/blob/80ba12b2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
index 08f76f2..4d24bfb 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
@@ -175,11 +175,16 @@ public class TezRuntimeConfiguration {
   /**
    * Report partition statistics (e.g better scheduling in ShuffleVertexManager). TEZ-2496
    * This can be enabled/disabled at vertex level.
+   * {@link org.apache.tez.runtime.library.api.TezRuntimeConfiguration.ReportPartitionStats}
+   * defines the list of values that can be specified.
+   * TODO TEZ-3303 Given ShuffleVertexManager doesn't consume precise stats
+   * yet. So do not set the value to "precise" yet when ShuffleVertexManager is used.
    */
-  @ConfigurationProperty(type = "boolean")
-  public static final String TEZ_RUNTIME_REPORT_PARTITION_STATS = TEZ_RUNTIME_PREFIX +
-      "report.partition.stats";
-  public static final boolean TEZ_RUNTIME_REPORT_PARTITION_STATS_DEFAULT = true;
+  @ConfigurationProperty
+  public static final String TEZ_RUNTIME_REPORT_PARTITION_STATS =
+      TEZ_RUNTIME_PREFIX + "report.partition.stats";
+  public static final String TEZ_RUNTIME_REPORT_PARTITION_STATS_DEFAULT =
+      ReportPartitionStats.MEMORY_OPTIMIZED.getType();
 
   /**
    * Size of the buffer to use if not writing directly to disk.
@@ -635,4 +640,70 @@ public class TezRuntimeConfiguration {
   public static Map<String, String> getOtherConfigDefaults() {
     return Collections.unmodifiableMap(otherConfMap);
   }
+
+  public enum ReportPartitionStats {
+    @Deprecated
+    /**
+     * Don't report partition stats. It is the same as NONE.
+     * It is defined to maintain backward compatibility given
+     * Configuration @link{#TEZ_RUNTIME_REPORT_PARTITION_STATS} used
+     * to be boolean type.
+     */
+    DISABLED("false"),
+
+    @Deprecated
+    /**
+     * Report partition stats. It is the same as MEMORY_OPTIMIZED.
+     * It is defined to maintain backward compatibility given
+     * Configuration @link{#TEZ_RUNTIME_REPORT_PARTITION_STATS} used
+     * to be boolean type.
+     */
+    ENABLED("true"),
+
+    /**
+     * Don't report partition stats.
+     */
+    NONE("none"),
+
+    /**
+     * Report partition stats with less precision to reduce
+     * memory and CPU overhead
+     */
+    MEMORY_OPTIMIZED("memory_optimized"),
+
+    /**
+     * Report precise partition stats in MB.
+     */
+    PRECISE("precise");
+
+    private final String type;
+
+    private ReportPartitionStats(String type) {
+      this.type = type;
+    }
+
+    public final String getType() {
+      return type;
+    }
+
+    public boolean isEnabled() {
+      return !equals(ReportPartitionStats.DISABLED) &&
+              !equals(ReportPartitionStats.NONE);
+    }
+
+    public boolean isPrecise() {
+      return equals(ReportPartitionStats.PRECISE);
+    }
+
+    public static ReportPartitionStats fromString(String type) {
+      if (type != null) {
+        for (ReportPartitionStats b : ReportPartitionStats.values()) {
+          if (type.equalsIgnoreCase(b.type)) {
+            return b;
+          }
+        }
+      }
+      throw new IllegalArgumentException("Invalid type " + type);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/80ba12b2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index ae646ea..d74e447 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -34,6 +34,7 @@ import javax.annotation.Nullable;
 import javax.crypto.SecretKey;
 
 import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
 import com.google.protobuf.ByteString;
 
 import org.apache.hadoop.conf.Configuration;
@@ -67,11 +68,13 @@ import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
 import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DetailedPartitionStatsProto;
 
 public class ShuffleUtils {
 
   private static final Logger LOG = LoggerFactory.getLogger(ShuffleUtils.class);
   public static final String SHUFFLE_HANDLER_SERVICE_ID = "mapreduce_shuffle";
+  private static final long MB = 1024l * 1024l;
 
   //Shared by multiple threads
   private static volatile SSLFactory sslFactory;
@@ -400,7 +403,8 @@ public class ShuffleUtils {
   public static void generateEventOnSpill(List<Event> eventList, boolean finalMergeEnabled,
       boolean isLastEvent, OutputContext context, int spillId, TezSpillRecord spillRecord,
       int numPhysicalOutputs, boolean sendEmptyPartitionDetails, String pathComponent,
-      @Nullable long[] partitionStats) throws IOException {
+      @Nullable long[] partitionStats, boolean reportDetailedPartitionStats)
+      throws IOException {
     Preconditions.checkArgument(eventList != null, "EventList can't be null");
 
     context.notifyProgress();
@@ -420,34 +424,50 @@ public class ShuffleUtils {
         finalMergeEnabled, isLastEvent, pathComponent);
 
     if (finalMergeEnabled || isLastEvent) {
-      ShuffleUserPayloads.VertexManagerEventPayloadProto.Builder vmBuilder =
-          ShuffleUserPayloads.VertexManagerEventPayloadProto.newBuilder();
-
-      long outputSize = context.getCounters().findCounter(TaskCounter.OUTPUT_BYTES).getValue();
+      VertexManagerEvent vmEvent = generateVMEvent(context, partitionStats,
+          reportDetailedPartitionStats);
+      eventList.add(vmEvent);
+    }
 
-      //Set this information only when required.  In pipelined shuffle, multiple events would end
-      // up adding up to final outputsize.  This is needed for auto-reduce parallelism to work
-      // properly.
-      vmBuilder.setOutputSize(outputSize);
+    CompositeDataMovementEvent csdme =
+        CompositeDataMovementEvent.create(0, numPhysicalOutputs, payload);
+    eventList.add(csdme);
+  }
 
-      //set partition stats
-      if (partitionStats != null && partitionStats.length > 0) {
-        RoaringBitmap stats = getPartitionStatsForPhysicalOutput(partitionStats);
+  public static VertexManagerEvent generateVMEvent(OutputContext context,
+      long[] sizePerPartition, boolean reportDetailedPartitionStats)
+          throws IOException {
+    ShuffleUserPayloads.VertexManagerEventPayloadProto.Builder vmBuilder =
+        ShuffleUserPayloads.VertexManagerEventPayloadProto.newBuilder();
+
+    long outputSize = context.getCounters().
+        findCounter(TaskCounter.OUTPUT_BYTES).getValue();
+
+    // Set this information only when required.  In pipelined shuffle,
+    // multiple events would end up adding up to final output size.
+    // This is needed for auto-reduce parallelism to work properly.
+    vmBuilder.setOutputSize(outputSize);
+
+    //set partition stats
+    if (sizePerPartition != null && sizePerPartition.length > 0) {
+      if (reportDetailedPartitionStats) {
+        vmBuilder.setDetailedPartitionStats(
+            getDetailedPartitionStatsForPhysicalOutput(sizePerPartition));
+      } else {
+        RoaringBitmap stats = getPartitionStatsForPhysicalOutput(
+            sizePerPartition);
         DataOutputBuffer dout = new DataOutputBuffer();
         stats.serialize(dout);
-        ByteString partitionStatsBytes = TezCommonUtils.compressByteArrayToByteString(dout.getData());
+        ByteString partitionStatsBytes =
+            TezCommonUtils.compressByteArrayToByteString(dout.getData());
         vmBuilder.setPartitionStats(partitionStatsBytes);
       }
-
-      VertexManagerEvent vmEvent = VertexManagerEvent.create(
-          context.getDestinationVertexName(), vmBuilder.build().toByteString().asReadOnlyByteBuffer());
-      eventList.add(vmEvent);
     }
 
-
-    CompositeDataMovementEvent csdme =
-        CompositeDataMovementEvent.create(0, numPhysicalOutputs, payload);
-    eventList.add(csdme);
+    VertexManagerEvent vmEvent = VertexManagerEvent.create(
+        context.getDestinationVertexName(),
+        vmBuilder.build().toByteString().asReadOnlyByteBuffer());
+    return vmEvent;
   }
 
   /**
@@ -469,6 +489,29 @@ public class ShuffleUtils {
     return partitionStats;
   }
 
+  static long ceil(long a, long b) {
+    return (a + (b - 1)) / b;
+  }
+
+  /**
+   * Detailed partition stats
+   *
+   * @param sizes actual partition sizes
+   */
+  public static DetailedPartitionStatsProto
+  getDetailedPartitionStatsForPhysicalOutput(long[] sizes) {
+    DetailedPartitionStatsProto.Builder builder =
+        DetailedPartitionStatsProto.newBuilder();
+    for (int i=0; i<sizes.length; i++) {
+      // Round the size up. So 1 byte -> the value of sizeInMB == 1
+      // Throws IllegalArgumentException if value is greater than
+      // Integer.MAX_VALUE. That should be ok given Integer.MAX_VALUE * MB
+      // means PB.
+      int sizeInMb = Ints.checkedCast(ceil(sizes[i], MB));
+      builder.addSizeInMb(sizeInMb);
+    }
+    return builder.build();
+  }
 
   /**
    * Log individual fetch complete event.

http://git-wip-us.apache.org/repos/asf/tez/blob/80ba12b2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
index 7a2dc68..b6fe457 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
@@ -53,6 +53,7 @@ import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.runtime.api.OutputContext;
 import org.apache.tez.runtime.library.api.Partitioner;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration.ReportPartitionStats;
 import org.apache.tez.runtime.library.common.ConfigUtils;
 import org.apache.tez.runtime.library.common.TezRuntimeUtils;
 import org.apache.tez.runtime.library.common.combine.Combiner;
@@ -159,16 +160,19 @@ public abstract class ExternalSorter {
   protected final TezCounter numAdditionalSpills;
   // Number of files offered via shuffle-handler to consumers.
   protected final TezCounter numShuffleChunks;
+  // How partition stats should be reported.
+  final ReportPartitionStats reportPartitionStats;
 
   public ExternalSorter(OutputContext outputContext, Configuration conf, int numOutputs,
       long initialMemoryAvailable) throws IOException {
     this.outputContext = outputContext;
     this.conf = conf;
     this.partitions = numOutputs;
-    boolean reportPartitionStats = conf.getBoolean(TezRuntimeConfiguration
-            .TEZ_RUNTIME_REPORT_PARTITION_STATS,
-        TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS_DEFAULT);
-    this.partitionStats = (reportPartitionStats) ? (new long[partitions]) : null;
+    reportPartitionStats = ReportPartitionStats.fromString(
+        conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS,
+        TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS_DEFAULT));
+    partitionStats = reportPartitionStats.isEnabled() ?
+        (new long[partitions]) : null;
 
     cleanup = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT,
         TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT_DEFAULT);
@@ -202,7 +206,8 @@ public abstract class ExternalSorter {
         + ", valueSerializerClass=" + valSerializer
         + ", comparator=" + (RawComparator) ConfigUtils.getIntermediateOutputKeyComparator(conf)
         + ", partitioner=" + conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS)
-        + ", serialization=" + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
+        + ", serialization=" + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY)
+        + ", reportPartitionStats=" + reportPartitionStats);
 
     //    counters    
     mapOutputByteCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES);
@@ -412,4 +417,8 @@ public abstract class ExternalSorter {
         .findCounter(TaskCounter.OUTPUT_RECORDS).getValue();
     statsReporter.reportItemsProcessed(outputRecords);
   }
+
+  public boolean reportDetailedPartitionStats() {
+    return reportPartitionStats.isPrecise();
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/80ba12b2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index 5695bde..897d7d7 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -347,9 +347,10 @@ public class PipelinedSorter extends ExternalSorter {
   private void sendPipelinedShuffleEvents() throws IOException{
     List<Event> events = Lists.newLinkedList();
     String pathComponent = (outputContext.getUniqueIdentifier() + "_" + (numSpills-1));
-    ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), false, outputContext,
-        (numSpills - 1), indexCacheList.get(numSpills - 1), partitions, sendEmptyPartitionDetails,
-        pathComponent, partitionStats);
+    ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), false,
+        outputContext, (numSpills - 1), indexCacheList.get(numSpills - 1),
+        partitions, sendEmptyPartitionDetails, pathComponent, partitionStats,
+        reportDetailedPartitionStats());
     outputContext.sendEvents(events);
     LOG.info(outputContext.getDestinationVertexName() +
         ": Added spill event for spill (final update=false), spillId=" + (numSpills - 1));
@@ -671,7 +672,8 @@ public class PipelinedSorter extends ExternalSorter {
           String pathComponent = (outputContext.getUniqueIdentifier() + "_" + i);
           ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), isLastEvent,
               outputContext, i, indexCacheList.get(i), partitions,
-              sendEmptyPartitionDetails, pathComponent, partitionStats);
+              sendEmptyPartitionDetails, pathComponent, partitionStats,
+              reportDetailedPartitionStats());
           LOG.info(outputContext.getDestinationVertexName() + ": Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + i);
         }
         outputContext.sendEvents(events);

http://git-wip-us.apache.org/repos/asf/tez/blob/80ba12b2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index a6a60c2..69bfdb8 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -1133,7 +1133,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
     String pathComponent = (outputContext.getUniqueIdentifier() + "_" + index);
     ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), isLastEvent,
         outputContext, index, spillRecord, partitions, sendEmptyPartitionDetails, pathComponent,
-        partitionStats);
+        partitionStats, reportDetailedPartitionStats());
 
     LOG.info(outputContext.getDestinationVertexName() + ": " +
         "Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + index);

http://git-wip-us.apache.org/repos/asf/tez/blob/80ba12b2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index 76075bb..152096c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -56,8 +56,8 @@ import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.TaskFailureType;
 import org.apache.tez.runtime.api.OutputContext;
 import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
-import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.runtime.library.api.IOInterruptedException;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration.ReportPartitionStats;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.Constants;
 import org.apache.tez.runtime.library.common.sort.impl.IFile;
@@ -65,9 +65,7 @@ import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
 import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
 import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
-import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
-import org.roaringbitmap.RoaringBitmap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -150,6 +148,8 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
   private final Condition spillInProgress = spillLock.newCondition();
 
   private final boolean pipelinedShuffle;
+  // How partition stats should be reported.
+  final ReportPartitionStats reportPartitionStats;
 
   private final long indexFileSizeEstimate;
 
@@ -208,7 +208,11 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
             .build());
     spillExecutor = MoreExecutors.listeningDecorator(executor);
     numRecordsPerPartition = new int[numPartitions];
-    sizePerPartition = new long[numPartitions];
+    reportPartitionStats = ReportPartitionStats.fromString(
+        conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS,
+        TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS_DEFAULT));
+    sizePerPartition = (reportPartitionStats.isEnabled()) ?
+        new long[numPartitions] : null;
 
     outputLargeRecordsCounter = outputContext.getCounters().findCounter(
         TaskCounter.OUTPUT_LARGE_RECORDS);
@@ -233,7 +237,8 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
         + ", sizePerBuffer=" + sizePerBuffer
         + ", skipBuffers=" + skipBuffers
         + ", pipelinedShuffle=" + pipelinedShuffle
-        + ", numPartitions=" + numPartitions);
+        + ", numPartitions=" + numPartitions
+        + ", reportPartitionStats=" + reportPartitionStats);
   }
 
   private void computeNumBuffersAndSize(int bufferLimit) {
@@ -364,10 +369,16 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     }
   }
 
+  private boolean reportPartitionStats() {
+    return (sizePerPartition != null);
+  }
+
   private void updateGlobalStats(WrappedBuffer buffer) {
     for (int i = 0; i < numPartitions; i++) {
       numRecordsPerPartition[i] += buffer.recordsPerPartition[i];
-      sizePerPartition[i] += buffer.sizePerPartition[i];
+      if (reportPartitionStats()) {
+        sizePerPartition[i] += buffer.sizePerPartition[i];
+      }
     }
   }
 
@@ -529,7 +540,9 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
           if (outputRecordsCounter.getValue() == 0) {
             emptyPartitions.set(0);
           }
-          sizePerPartition[0] = rawLen;
+          if (reportPartitionStats()) {
+            sizePerPartition[0] = rawLen;
+          }
           cleanupCurrentBuffer();
 
           outputBytesWithOverheadCounter.increment(rawLen);
@@ -575,37 +588,13 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     return emptyPartitions;
   }
 
-  private Event generateVMEvent() throws IOException {
-    return generateVMEvent(this.sizePerPartition);
+  public boolean reportDetailedPartitionStats() {
+    return reportPartitionStats.isPrecise();
   }
 
-  private Event generateVMEvent(long[] sizePerPartition) throws IOException {
-    ShuffleUserPayloads.VertexManagerEventPayloadProto.Builder vmBuilder =
-        ShuffleUserPayloads.VertexManagerEventPayloadProto.newBuilder();
-
-    long outputSize = outputContext.getCounters().
-        findCounter(TaskCounter.OUTPUT_BYTES).getValue();
-
-    // Set this information only when required.  In pipelined shuffle,
-    // multiple events would end up adding up to final output size.
-    // This is needed for auto-reduce parallelism to work properly.
-    vmBuilder.setOutputSize(outputSize);
-
-    //set partition stats
-    if (sizePerPartition != null && sizePerPartition.length > 0) {
-      RoaringBitmap stats = ShuffleUtils.getPartitionStatsForPhysicalOutput(
-          sizePerPartition);
-      DataOutputBuffer dout = new DataOutputBuffer();
-      stats.serialize(dout);
-      ByteString partitionStatsBytes =
-          TezCommonUtils.compressByteArrayToByteString(dout.getData());
-      vmBuilder.setPartitionStats(partitionStatsBytes);
-    }
-
-    VertexManagerEvent vmEvent = VertexManagerEvent.create(
-        outputContext.getDestinationVertexName(),
-            vmBuilder.build().toByteString().asReadOnlyByteBuffer());
-    return vmEvent;
+  private Event generateVMEvent() throws IOException {
+    return ShuffleUtils.generateVMEvent(outputContext, this.sizePerPartition,
+        this.reportDetailedPartitionStats());
   }
 
   private Event generateDMEvent() throws IOException {
@@ -667,7 +656,9 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     if (currentBuffer.nextPosition == 0) {
       if (pipelinedShuffle) {
         List<Event> eventList = Lists.newLinkedList();
-        eventList.add(generateVMEvent(new long[numPartitions]));
+        eventList.add(ShuffleUtils.generateVMEvent(outputContext,
+            reportPartitionStats() ? new long[numPartitions] : null,
+                reportDetailedPartitionStats()));
         //Send final event with all empty partitions and null path component.
         BitSet emptyPartitions = new BitSet(numPartitions);
         emptyPartitions.flip(0, numPartitions);
@@ -844,7 +835,9 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
             writer.append(key, value);
             outputLargeRecordsCounter.increment(1);
             numRecordsPerPartition[i]++;
-            sizePerPartition[i] += writer.getRawLength();
+            if (reportPartitionStats()) {
+              sizePerPartition[i] += writer.getRawLength();
+            }
             writer.close();
             additionalSpillBytesWritternCounter.increment(writer.getCompressedLength());
             TezIndexRecord indexRecord = new TezIndexRecord(recordStart, writer.getRawLength(),
@@ -985,7 +978,8 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     try {
       String pathComponent = (outputContext.getUniqueIdentifier() + "_" + spillNumber);
       if (isFinalUpdate) {
-        eventList.add(generateVMEvent(sizePerPartition));
+        eventList.add(ShuffleUtils.generateVMEvent(outputContext,
+            sizePerPartition, reportDetailedPartitionStats()));
       }
       Event compEvent = generateDMEvent(true, spillNumber, isFinalUpdate,
           pathComponent, emptyPartitions);

http://git-wip-us.apache.org/repos/asf/tez/blob/80ba12b2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
index c0b0760..9a3d778 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
@@ -200,7 +200,7 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput {
       ShuffleUtils.generateEventOnSpill(eventList, finalMergeEnabled, isLastEvent,
           getContext(), 0, new TezSpillRecord(sorter.getFinalIndexFile(), conf),
           getNumPhysicalOutputs(), sendEmptyPartitionDetails, getContext().getUniqueIdentifier(),
-          sorter.getPartitionStats());
+          sorter.getPartitionStats(), sorter.reportDetailedPartitionStats());
     }
     return eventList;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/80ba12b2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
index 879c2e0..4f74f7d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
@@ -172,6 +172,7 @@ public class UnorderedKVOutput extends AbstractLogicalOutput {
     confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH);
     confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS);
   }
 
   // TODO Maybe add helper methods to extract keys

http://git-wip-us.apache.org/repos/asf/tez/blob/80ba12b2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
index 90c0ed4..c4b3b22 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
@@ -145,6 +145,7 @@ public class UnorderedPartitionedKVOutput extends AbstractLogicalOutput {
     confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH);
     confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS);
   }
 
   // TODO Maybe add helper methods to extract keys

http://git-wip-us.apache.org/repos/asf/tez/blob/80ba12b2/tez-runtime-library/src/main/proto/ShufflePayloads.proto
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/proto/ShufflePayloads.proto b/tez-runtime-library/src/main/proto/ShufflePayloads.proto
index 9b0fc16..f78cbac 100644
--- a/tez-runtime-library/src/main/proto/ShufflePayloads.proto
+++ b/tez-runtime-library/src/main/proto/ShufflePayloads.proto
@@ -41,9 +41,16 @@ message InputInformationEventPayloadProto {
   optional int32 partition_range = 1;
 }
 
+// DetailedPartitionStatsProto represents size of a list of partitions.
+// It is more accurate than the partition_stats.
+message DetailedPartitionStatsProto {
+  repeated int32 size_in_mb = 1;
+}
+
 message VertexManagerEventPayloadProto {
   optional int64 output_size = 1;
   optional bytes partition_stats = 2;
+  optional DetailedPartitionStatsProto detailed_partition_stats = 3;
 }
 
 message ShuffleEdgeManagerConfigPayloadProto {

http://git-wip-us.apache.org/repos/asf/tez/blob/80ba12b2/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
index c542030..4233f5d 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
@@ -161,8 +161,9 @@ public class TestShuffleUtils {
     int spillId = 0;
     int physicalOutputs = 10;
     String pathComponent = "/attempt_x_y_0/file.out";
-    ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent, outputContext,
-        spillId, new TezSpillRecord(indexFile, conf), physicalOutputs, true, pathComponent, null);
+    ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent,
+        outputContext, spillId, new TezSpillRecord(indexFile, conf),
+            physicalOutputs, true, pathComponent, null, false);
 
     Assert.assertTrue(events.size() == 1);
     Assert.assertTrue(events.get(0) instanceof CompositeDataMovementEvent);
@@ -199,8 +200,9 @@ public class TestShuffleUtils {
     String pathComponent = "/attempt_x_y_0/file.out";
 
     //normal code path where we do final merge all the time
-    ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent, outputContext,
-        spillId, new TezSpillRecord(indexFile, conf), physicalOutputs, true, pathComponent, null);
+    ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent,
+        outputContext, spillId, new TezSpillRecord(indexFile, conf),
+            physicalOutputs, true, pathComponent, null, false);
 
     Assert.assertTrue(events.size() == 2); //one for VM
     Assert.assertTrue(events.get(0) instanceof VertexManagerEvent);
@@ -239,8 +241,9 @@ public class TestShuffleUtils {
     String pathComponent = "/attempt_x_y_0/file.out";
 
     //normal code path where we do final merge all the time
-    ShuffleUtils.generateEventOnSpill(events, finalMergeDisabled, isLastEvent, outputContext,
-        spillId, new TezSpillRecord(indexFile, conf), physicalOutputs, true, pathComponent, null);
+    ShuffleUtils.generateEventOnSpill(events, finalMergeDisabled, isLastEvent,
+        outputContext, spillId, new TezSpillRecord(indexFile, conf),
+            physicalOutputs, true, pathComponent, null, false);
 
     Assert.assertTrue(events.size() == 2); //one for VM
     Assert.assertTrue(events.get(0) instanceof VertexManagerEvent);

http://git-wip-us.apache.org/repos/asf/tez/blob/80ba12b2/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
index 70819e5..80e7b14 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
@@ -40,6 +40,7 @@ import org.apache.tez.runtime.api.OutputContext;
 import org.apache.tez.runtime.api.OutputStatisticsReporter;
 import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration.ReportPartitionStats;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.apache.tez.runtime.library.conf.OrderedPartitionedKVOutputConfig.SorterImpl;
 import org.apache.tez.runtime.library.partitioner.HashPartitioner;
@@ -404,10 +405,11 @@ public class TestPipelinedSorter {
     writeData(sorter, numKeys, keySize);
 
     //partition stats;
-    boolean partitionStats = conf.getBoolean(TezRuntimeConfiguration
-        .TEZ_RUNTIME_REPORT_PARTITION_STATS, TezRuntimeConfiguration
-        .TEZ_RUNTIME_REPORT_PARTITION_STATS_DEFAULT);
-    if (partitionStats) {
+    ReportPartitionStats partitionStats =
+        ReportPartitionStats.fromString(conf.get(
+        TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS,
+        TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS_DEFAULT));
+    if (partitionStats.isEnabled()) {
       assertTrue(sorter.getPartitionStats() != null);
     }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/80ba12b2/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
index 9d2b615..41b2b97 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
@@ -79,6 +79,7 @@ import org.apache.tez.runtime.api.OutputContext;
 import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
 import org.apache.tez.runtime.library.api.Partitioner;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration.ReportPartitionStats;
 import org.apache.tez.runtime.library.common.sort.impl.IFile;
 import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
 import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
@@ -115,14 +116,28 @@ public class TestUnorderedPartitionedKVWriter {
   private static FileSystem localFs;
 
   private boolean shouldCompress;
+  private ReportPartitionStats reportPartitionStats;
 
-  public TestUnorderedPartitionedKVWriter(boolean shouldCompress) {
+  public TestUnorderedPartitionedKVWriter(boolean shouldCompress,
+      ReportPartitionStats reportPartitionStats) {
     this.shouldCompress = shouldCompress;
+    this.reportPartitionStats = reportPartitionStats;
   }
 
-  @Parameters
+  @SuppressWarnings("deprecation")
+  @Parameterized.Parameters(name = "test[{0}, {1}, {2}]")
   public static Collection<Object[]> data() {
-    Object[][] data = new Object[][] { { false }, { true } };
+    Object[][] data = new Object[][] {
+        { false, ReportPartitionStats.DISABLED },
+        { false, ReportPartitionStats.ENABLED },
+        { false, ReportPartitionStats.NONE },
+        { false, ReportPartitionStats.MEMORY_OPTIMIZED },
+        { false, ReportPartitionStats.PRECISE },
+        { true, ReportPartitionStats.DISABLED },
+        { true, ReportPartitionStats.ENABLED },
+        { true, ReportPartitionStats.NONE },
+        { true, ReportPartitionStats.MEMORY_OPTIMIZED },
+        { true, ReportPartitionStats.PRECISE }};
     return Arrays.asList(data);
   }
 
@@ -415,36 +430,54 @@ public class TestUnorderedPartitionedKVWriter {
     assertEquals(0, expectedValues.size());
   }
 
-  private long[] getPartitionStats(
-      VertexManagerEvent vme) throws IOException {
+  private int[] getPartitionStats(VertexManagerEvent vme) throws IOException {
     RoaringBitmap partitionStats = new RoaringBitmap();
     ShuffleUserPayloads.VertexManagerEventPayloadProto
         payload = ShuffleUserPayloads.VertexManagerEventPayloadProto
         .parseFrom(ByteString.copyFrom(vme.getUserPayload()));
-    assertTrue(payload.hasPartitionStats());
-    ByteString compressedPartitionStats = payload.getPartitionStats();
-    byte[] rawData = TezCommonUtils.decompressByteStringToByteArray(
-        compressedPartitionStats);
-    ByteArrayInputStream bin = new ByteArrayInputStream(rawData);
-    partitionStats.deserialize(new DataInputStream(bin));
-    long[] stats = new long[partitionStats.getCardinality()];
-    Iterator<Integer> it = partitionStats.iterator();
-    final DATA_RANGE_IN_MB[] RANGES = DATA_RANGE_IN_MB.values();
-    final int RANGE_LEN = RANGES.length;
-    while (it.hasNext()) {
-      int pos = it.next();
-      int index = ((pos) / RANGE_LEN);
-      int rangeIndex = ((pos) % RANGE_LEN);
-      if (RANGES[rangeIndex].getSizeInMB() > 0) {
-        stats[index] += RANGES[rangeIndex].getSizeInMB();
+    if (!reportPartitionStats.isEnabled()) {
+      assertFalse(payload.hasPartitionStats());
+      assertFalse(payload.hasDetailedPartitionStats());
+      return null;
+    }
+    if (reportPartitionStats.isPrecise()) {
+      assertTrue(payload.hasDetailedPartitionStats());
+      List<Integer> sizeInMBList =
+          payload.getDetailedPartitionStats().getSizeInMbList();
+      int[] stats = new int[sizeInMBList.size()];
+      for (int i=0; i<sizeInMBList.size(); i++) {
+        stats[i] += sizeInMBList.get(i);
       }
+      return stats;
+    } else {
+      assertTrue(payload.hasPartitionStats());
+      ByteString compressedPartitionStats = payload.getPartitionStats();
+      byte[] rawData = TezCommonUtils.decompressByteStringToByteArray(
+          compressedPartitionStats);
+      ByteArrayInputStream bin = new ByteArrayInputStream(rawData);
+      partitionStats.deserialize(new DataInputStream(bin));
+      int[] stats = new int[partitionStats.getCardinality()];
+      Iterator<Integer> it = partitionStats.iterator();
+      final DATA_RANGE_IN_MB[] RANGES = DATA_RANGE_IN_MB.values();
+      final int RANGE_LEN = RANGES.length;
+      while (it.hasNext()) {
+        int pos = it.next();
+        int index = ((pos) / RANGE_LEN);
+        int rangeIndex = ((pos) % RANGE_LEN);
+        if (RANGES[rangeIndex].getSizeInMB() > 0) {
+          stats[index] += RANGES[rangeIndex].getSizeInMB();
+        }
+      }
+      return stats;
     }
-    return stats;
   }
 
   private void verifyPartitionStats(VertexManagerEvent vme,
       BitSet expectedPartitionsWithData) throws IOException {
-    long[] stats = getPartitionStats(vme);
+    int[] stats = getPartitionStats(vme);
+    if (stats == null) {
+      return;
+    }
     for (int i = 0; i < stats.length; i++) {
       // The stats should be greater than zero if and only if
       // the partition has data
@@ -922,6 +955,8 @@ public class TestUnorderedPartitionedKVWriter {
       conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC,
           DefaultCodec.class.getName());
     }
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS,
+        reportPartitionStats.getType());
     return conf;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/80ba12b2/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java
index fabf52d..9d6ca50 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration.ReportPartitionStats;
 import org.junit.Test;
 
 public class TestOrderedPartitionedKVEdgeConfig {
@@ -132,7 +133,8 @@ public class TestOrderedPartitionedKVEdgeConfig {
     additionalConfs.put(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, "1111");
     additionalConfs.put(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT, "0.22f");
     additionalConfs.put(TezRuntimeConfiguration.TEZ_RUNTIME_INTERNAL_SORTER_CLASS, "CustomSorter");
-    additionalConfs.put(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS, "true");
+    additionalConfs.put(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS,
+        ReportPartitionStats.MEMORY_OPTIMIZED.getType());
     additionalConfs.put("file.shouldExist", "file");
 
     OrderedPartitionedKVEdgeConfig.Builder builder = OrderedPartitionedKVEdgeConfig
@@ -173,9 +175,11 @@ public class TestOrderedPartitionedKVEdgeConfig {
     assertEquals("io", outputConf.get("io.shouldExist"));
     assertEquals("file", outputConf.get("file.shouldExist"));
     assertEquals("fs", outputConf.get("fs.shouldExist"));
-    assertEquals(true, outputConf.getBoolean(TezRuntimeConfiguration
-            .TEZ_RUNTIME_REPORT_PARTITION_STATS,
+    ReportPartitionStats partitionStats =
+        ReportPartitionStats.fromString(outputConf.get(
+        TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS,
         TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS_DEFAULT));
+    assertEquals(true, partitionStats.isEnabled());
 
 
     assertEquals(3, inputConf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR, 0));

http://git-wip-us.apache.org/repos/asf/tez/blob/80ba12b2/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
index 8942f4b..93c4f92 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
@@ -38,6 +38,7 @@ import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.runtime.library.api.KeyValuesWriter;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration.ReportPartitionStats;
 import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
 import org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter;
 import org.apache.tez.runtime.library.conf.OrderedPartitionedKVOutputConfig.SorterImpl;
@@ -98,6 +99,7 @@ public class TestOnFileSortedOutput {
   private boolean sendEmptyPartitionViaEvent;
   //Partition index for which data should not be written to.
   private int emptyPartitionIdx;
+  private ReportPartitionStats reportPartitionStats;
 
   /**
    * Constructor
@@ -107,13 +109,14 @@ public class TestOnFileSortedOutput {
    * @param sorterThreads number of threads needed for sorter (required only for pipelined sorter)
    * @param emptyPartitionIdx for which data should not be generated
    */
-  public TestOnFileSortedOutput(boolean sendEmptyPartitionViaEvent, SorterImpl sorterImpl,
-      int sorterThreads, int emptyPartitionIdx) throws IOException {
+  public TestOnFileSortedOutput(boolean sendEmptyPartitionViaEvent,
+      SorterImpl sorterImpl, int sorterThreads, int emptyPartitionIdx,
+      ReportPartitionStats reportPartitionStats) throws IOException {
     this.sendEmptyPartitionViaEvent = sendEmptyPartitionViaEvent;
     this.emptyPartitionIdx = emptyPartitionIdx;
     this.sorterImpl = sorterImpl;
     this.sorterThreads = sorterThreads;
-
+    this.reportPartitionStats = reportPartitionStats;
     conf = new Configuration();
 
     workingDir = new Path(".", this.getClass().getName());
@@ -135,7 +138,8 @@ public class TestOnFileSortedOutput {
 
     conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED,
         sendEmptyPartitionViaEvent);
-
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS,
+        reportPartitionStats.getType());
     outputSize.set(0);
     numRecords.set(0);
     fs.mkdirs(workingDir);
@@ -147,27 +151,39 @@ public class TestOnFileSortedOutput {
     fs.delete(workingDir, true);
   }
 
-  @Parameterized.Parameters(name = "test[{0}, {1}, {2}, {3}]")
+  @SuppressWarnings("deprecation")
+  @Parameterized.Parameters(name = "test[{0}, {1}, {2}, {3}, {4}]")
   public static Collection<Object[]> getParameters() {
     Collection<Object[]> parameters = new ArrayList<Object[]>();
-    //empty_partition_via_events_enabled, noOfSortThreads, partitionToBeEmpty
-    parameters.add(new Object[] { false, SorterImpl.LEGACY, 1, -1 });
-    parameters.add(new Object[] { false, SorterImpl.LEGACY, 1, 0 });
-    parameters.add(new Object[] { true, SorterImpl.LEGACY, 1, -1 });
-    parameters.add(new Object[] { true, SorterImpl.LEGACY, 1, 0 });
+    //empty_partition_via_events_enabled, noOfSortThreads,
+    // partitionToBeEmpty, reportPartitionStats
+    parameters.add(new Object[] { false, SorterImpl.LEGACY, 1, -1,
+        ReportPartitionStats.ENABLED });
+    parameters.add(new Object[] { false, SorterImpl.LEGACY, 1, 0,
+        ReportPartitionStats.ENABLED  });
+    parameters.add(new Object[] { true, SorterImpl.LEGACY, 1, -1,
+        ReportPartitionStats.ENABLED  });
+    parameters.add(new Object[] { true, SorterImpl.LEGACY, 1, 0,
+        ReportPartitionStats.ENABLED  });
+    parameters.add(new Object[] { true, SorterImpl.LEGACY, 1, 0,
+        ReportPartitionStats.PRECISE  });
 
     //Pipelined sorter
-    parameters.add(new Object[] { false, SorterImpl.PIPELINED, 2, -1 });
-    parameters.add(new Object[] { false, SorterImpl.PIPELINED, 2, 0 });
-    parameters.add(new Object[] { true, SorterImpl.PIPELINED, 2, -1 });
-    parameters.add(new Object[] { true, SorterImpl.PIPELINED, 2, 0 });
-
+    parameters.add(new Object[] { false, SorterImpl.PIPELINED, 2, -1,
+        ReportPartitionStats.ENABLED  });
+    parameters.add(new Object[] { false, SorterImpl.PIPELINED, 2, 0,
+        ReportPartitionStats.ENABLED  });
+    parameters.add(new Object[] { true, SorterImpl.PIPELINED, 2, -1,
+        ReportPartitionStats.ENABLED  });
+    parameters.add(new Object[] { true, SorterImpl.PIPELINED, 2, 0,
+        ReportPartitionStats.ENABLED  });
+    parameters.add(new Object[] { true, SorterImpl.PIPELINED, 2, 0,
+        ReportPartitionStats.PRECISE  });
     return parameters;
   }
 
   private void startSortedOutput(int partitions) throws Exception {
     OutputContext context = createTezOutputContext();
-    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS, true);
     conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 4);
     UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf);
     doReturn(payLoad).when(context).getUserPayload();
@@ -299,7 +315,11 @@ public class TestOnFileSortedOutput {
         .parseFrom(
             ByteString.copyFrom(((VertexManagerEvent) eventList.get(0)).getUserPayload()));
 
-    assertTrue(vmPayload.hasPartitionStats());
+    if (reportPartitionStats.isPrecise()) {
+      assertTrue(vmPayload.hasDetailedPartitionStats());
+    } else {
+      assertTrue(vmPayload.hasPartitionStats());
+    }
     assertEquals(HOST, payload.getHost());
     assertEquals(PORT, payload.getPort());
     assertEquals(UniqueID, payload.getPathComponent());