You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2015/05/29 01:45:55 UTC

tez git commit: TEZ-2198. Fix sorter spill counts (rbalamohan)

Repository: tez
Updated Branches:
  refs/heads/master b9056657a -> 42b7756eb


TEZ-2198. Fix sorter spill counts (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/42b7756e
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/42b7756e
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/42b7756e

Branch: refs/heads/master
Commit: 42b7756ebea108f81dbf633c847e2f70cd099e0f
Parents: b905665
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Fri May 29 05:16:46 2015 +0530
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Fri May 29 05:16:46 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/tez/common/counters/TaskCounter.java | 11 ++-
 .../common/sort/impl/ExternalSorter.java        | 21 +++++-
 .../common/sort/impl/PipelinedSorter.java       | 54 +++++++++-----
 .../common/sort/impl/dflt/DefaultSorter.java    | 76 ++++++++++----------
 .../common/sort/impl/TestPipelinedSorter.java   | 58 ++++++++++++++-
 .../sort/impl/dflt/TestDefaultSorter.java       | 43 +++++++++--
 .../apache/tez/test/TestPipelinedShuffle.java   |  2 +-
 8 files changed, 197 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/42b7756e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ece14a3..7f240c1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,7 @@ INCOMPATIBLE CHANGES
   TEZ-2468. Change the minimum Java version to Java 7.
 
 ALL CHANGES:
+  TEZ-2198. Fix sorter spill counts.
   TEZ-1883. Change findbugs version to 3.x.
   TEZ-2440. Sorter should check for indexCacheList.size() in flush().
   TEZ-2490. TEZ-2450 breaks Hadoop 2.2 and 2.4 compatability.

http://git-wip-us.apache.org/repos/asf/tez/blob/42b7756e/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
index 128b067..7dcdf8a 100644
--- a/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
+++ b/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
@@ -131,10 +131,19 @@ public enum TaskCounter {
   ADDITIONAL_SPILLS_BYTES_READ,
   
   /**
-   * Actual number of unnecessary spills. (lac of adequate memory)
+   * Spills that were generated & read by the same task (unnecessary spills due to lac of
+   * adequate memory).
+   *
    * Used by OnFileSortedOutput
    */
   ADDITIONAL_SPILL_COUNT,
+
+  /**
+   * Number of spill files being offered via shuffle-handler.
+   * e.g Without pipelined shuffle, this would be 1. With pipelined shuffle, this could be many
+   * as final merge is avoided.
+   */
+  SHUFFLE_CHUNK_COUNT,
   
   INPUT_GROUPS, // Not used at the moment. Will eventually replace REDUCE_INPUT_GROUPS
 

http://git-wip-us.apache.org/repos/asf/tez/blob/42b7756e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
index 40d22fe..27fe37a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
@@ -24,6 +24,7 @@ import java.io.InputStream;
 import java.util.Iterator;
 import java.util.Map;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
 import org.apache.tez.runtime.library.api.IOInterruptedException;
 import org.slf4j.Logger;
@@ -118,6 +119,9 @@ public abstract class ExternalSorter {
 
   protected final boolean cleanup;
 
+  protected final boolean finalMergeEnabled;
+  protected final boolean sendEmptyPartitionDetails;
+
   // Counters
   // MR compatilbity layer needs to rename counters back to what MR requries.
 
@@ -141,9 +145,10 @@ public abstract class ExternalSorter {
   protected final TezCounter additionalSpillBytesWritten;
   
   protected final TezCounter additionalSpillBytesRead;
-  // Number of additional spills. (This will be 0 if there's no additional
-  // spills)
+  // Number of spills written & consumed by the same task to generate the final file
   protected final TezCounter numAdditionalSpills;
+  // Number of files offered via shuffle-handler to consumers.
+  protected final TezCounter numShuffleChunks;
 
   public ExternalSorter(OutputContext outputContext, Configuration conf, int numOutputs,
       long initialMemoryAvailable) throws IOException {
@@ -187,6 +192,7 @@ public abstract class ExternalSorter {
     additionalSpillBytesWritten = outputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN);
     additionalSpillBytesRead = outputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ);
     numAdditionalSpills = outputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILL_COUNT);
+    numShuffleChunks = outputContext.getCounters().findCounter(TaskCounter.SHUFFLE_CHUNK_COUNT);
 
     // compression
     if (ConfigUtils.shouldCompressIntermediateOutput(this.conf)) {
@@ -235,6 +241,17 @@ public abstract class ExternalSorter {
     this.conf.setInt(TezRuntimeFrameworkConfigs.TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS, this.partitions);
     this.partitioner = TezRuntimeUtils.instantiatePartitioner(this.conf);
     this.combiner = TezRuntimeUtils.instantiateCombiner(this.conf, outputContext);
+    this.finalMergeEnabled = conf.getBoolean(
+        TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT,
+        TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT_DEFAULT);
+    this.sendEmptyPartitionDetails = conf.getBoolean(
+        TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED,
+        TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED_DEFAULT);
+  }
+
+  @VisibleForTesting
+  public boolean isFinalMergeEnabled() {
+    return finalMergeEnabled;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/tez/blob/42b7756e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index 9113fca..6e4d72e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -111,8 +111,6 @@ public class PipelinedSorter extends ExternalSorter {
   private int indexCacheMemoryLimit;
 
   private final boolean pipelinedShuffle;
-  private final boolean finalMergeEnabled;
-  private final boolean sendEmptyPartitionDetails;
 
   // TODO Set additional countesr - total bytes written, spills etc.
 
@@ -127,20 +125,11 @@ public class PipelinedSorter extends ExternalSorter {
     
     partitionBits = bitcount(partitions)+1;
 
-    finalMergeEnabled = conf.getBoolean(
-        TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT,
-        TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT_DEFAULT);
-
     boolean confPipelinedShuffle = this.conf.getBoolean(TezRuntimeConfiguration
         .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, TezRuntimeConfiguration
         .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED_DEFAULT);
 
-    sendEmptyPartitionDetails = conf.getBoolean(
-        TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED,
-        TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED_DEFAULT);
-
-
-    pipelinedShuffle = !finalMergeEnabled && confPipelinedShuffle;
+    pipelinedShuffle = !isFinalMergeEnabled() && confPipelinedShuffle;
 
     //sanity checks
     final long sortmb = this.availableMemoryMb;
@@ -157,7 +146,7 @@ public class PipelinedSorter extends ExternalSorter {
     int numberOfBlocks = Math.max(1, (int) Math.ceil(1.0 * usage / blockSize));
     LOG.info("Number of Blocks : " + numberOfBlocks
         + ", maxMemUsage=" + maxMemUsage + ", BLOCK_SIZE=" + blockSize + ", finalMergeEnabled="
-        + finalMergeEnabled + ", pipelinedShuffle=" + pipelinedShuffle + ", "
+        + isFinalMergeEnabled() + ", pipelinedShuffle=" + pipelinedShuffle + ", "
         + "sendEmptyPartitionDetails=" + sendEmptyPartitionDetails);
     long totalCapacityWithoutMeta = 0;
     for (int i = 0; i < numberOfBlocks; i++) {
@@ -235,7 +224,7 @@ public class PipelinedSorter extends ExternalSorter {
       if (pipelinedShuffle) {
         List<Event> events = Lists.newLinkedList();
         String pathComponent = (outputContext.getUniqueIdentifier() + "_" + (numSpills-1));
-        ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, false, outputContext,
+        ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), false, outputContext,
             (numSpills - 1), indexCacheList.get(numSpills - 1), partitions, sendEmptyPartitionDetails,
             pathComponent);
         outputContext.sendEvents(events);
@@ -343,6 +332,21 @@ public class PipelinedSorter extends ExternalSorter {
     mapOutputByteCounter.increment(valend - keystart);
   }
 
+  private void adjustSpillCounters(long rawLength, long compLength) {
+    if (!isFinalMergeEnabled()) {
+      outputBytesWithOverheadCounter.increment(rawLength);
+    } else {
+      if (numSpills > 0) {
+        additionalSpillBytesWritten.increment(compLength);
+        // Reset the value will be set during the final merge.
+        outputBytesWithOverheadCounter.setValue(0);
+      } else {
+        // Set this up for the first write only. Subsequent ones will be handled in the final merge.
+        outputBytesWithOverheadCounter.increment(rawLength);
+      }
+    }
+  }
+
   public void spill() throws IOException {
     // create spill file
     final long size = capacity +
@@ -381,6 +385,7 @@ public class PipelinedSorter extends ExternalSorter {
         }
         //close
         writer.close();
+        adjustSpillCounters(writer.getRawLength(), writer.getCompressedLength());
 
         // record offsets
         final TezIndexRecord rec = 
@@ -399,6 +404,11 @@ public class PipelinedSorter extends ExternalSorter {
       //TODO: honor cache limits
       indexCacheList.add(spillRec);
       ++numSpills;
+      if (!isFinalMergeEnabled()) {
+        fileOutputByteCounter.increment(rfs.getFileStatus(filename).getLen());
+        //No final merge. Set the number of files offered via shuffle-handler
+        numShuffleChunks.setValue(numSpills);
+      }
     } finally {
       out.close();
     }
@@ -440,14 +450,13 @@ public class PipelinedSorter extends ExternalSorter {
       //safe to clean up
       bufferList.clear();
 
-      numAdditionalSpills.increment(numSpills - 1);
 
       if(indexCacheList.isEmpty()) {
         LOG.warn("Index list is empty... returning");
         return;
       }
 
-      if (!finalMergeEnabled) {
+      if (!isFinalMergeEnabled()) {
         //Generate events for all spills
         List<Event> events = Lists.newLinkedList();
 
@@ -459,16 +468,17 @@ public class PipelinedSorter extends ExternalSorter {
           boolean isLastEvent = (i == numSpills - 1);
 
           String pathComponent = (outputContext.getUniqueIdentifier() + "_" + i);
-          ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent,
+          ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), isLastEvent,
               outputContext, i, indexCacheList.get(i), partitions,
               sendEmptyPartitionDetails, pathComponent);
           LOG.info("Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + i);
         }
         outputContext.sendEvents(events);
-        //No need to generate final merge
         return;
       }
 
+      numAdditionalSpills.increment(numSpills - 1);
+
       //In case final merge is required, the following code path is executed.
       if (numSpills == 1) {
         // someday be able to pass this directly to shuffle
@@ -485,6 +495,8 @@ public class PipelinedSorter extends ExternalSorter {
               + "finalIndexFile=" + finalIndexFile + ", filename=" + filename + ", indexFilename=" +
               indexFilename);
         }
+        numShuffleChunks.setValue(numSpills);
+        fileOutputByteCounter.increment(rfs.getFileStatus(finalOutputFile).getLen());
         return;
       }
 
@@ -531,7 +543,7 @@ public class PipelinedSorter extends ExternalSorter {
             new Path(uniqueIdentifier),
             (RawComparator) ConfigUtils.getIntermediateOutputKeyComparator(conf),
             nullProgressable, sortSegments, true,
-            null, spilledRecordsCounter, null,
+            null, spilledRecordsCounter, additionalSpillBytesRead,
             null); // Not using any Progress in TezMerger. Should just work.
 
         //write merged output to disk
@@ -548,6 +560,7 @@ public class PipelinedSorter extends ExternalSorter {
 
         //close
         writer.close();
+        outputBytesWithOverheadCounter.increment(writer.getRawLength());
 
         // record offsets
         final TezIndexRecord rec =
@@ -558,6 +571,9 @@ public class PipelinedSorter extends ExternalSorter {
         spillRec.putIndex(rec, parts);
       }
 
+      numShuffleChunks.setValue(1); //final merge has happened.
+      fileOutputByteCounter.increment(rfs.getFileStatus(finalOutputFile).getLen());
+
       spillRec.writeToFile(finalIndexFile, conf);
       finalOut.close();
       for (int i = 0; i < numSpills; i++) {

http://git-wip-us.apache.org/repos/asf/tez/blob/42b7756e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index afe07f0..ebf40f3 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -120,8 +120,6 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
   private long totalKeys = 0;
   private long sameKey = 0;
 
-  private final boolean finalMergeEnabled;
-  private final boolean sendEmptyPartitionDetails;
 
   public DefaultSorter(OutputContext outputContext, Configuration conf, int numOutputs,
       long initialMemoryAvailable) throws IOException {
@@ -139,10 +137,6 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
     indexCacheMemoryLimit = this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES,
                                        TezRuntimeConfiguration.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES_DEFAULT);
 
-    finalMergeEnabled = conf.getBoolean(
-        TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT,
-        TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT_DEFAULT);
-
     boolean confPipelinedShuffle = this.conf.getBoolean(TezRuntimeConfiguration
         .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, TezRuntimeConfiguration
         .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED_DEFAULT);
@@ -152,10 +146,6 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
           + "with DefaultSorter. It is supported only with PipelinedSorter.");
     }
 
-    sendEmptyPartitionDetails = conf.getBoolean(
-        TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED,
-        TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED_DEFAULT);
-
     // buffers and accounting
     int maxMemUsage = sortmb << 20;
     maxMemUsage -= maxMemUsage % METASIZE;
@@ -175,7 +165,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
       LOG.info(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + ": " + sortmb);
       LOG.info("soft limit at " + softLimit);
       LOG.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid);
-      LOG.info("kvstart = " + kvstart + "; length = " + maxRec + "; finalMergeEnabled = " + finalMergeEnabled);
+      LOG.info("kvstart = " + kvstart + "; length = " + maxRec + "; finalMergeEnabled = " + isFinalMergeEnabled());
     }
 
     // k/v serialization
@@ -716,7 +706,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
       cleanup();
       Thread.currentThread().interrupt();
     }
-    if (finalMergeEnabled) {
+    if (isFinalMergeEnabled()) {
       fileOutputByteCounter.increment(rfs.getFileStatus(finalOutputFile).getLen());
     }
   }
@@ -818,6 +808,22 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
     spill(mstart, mend);
   }
 
+  private void adjustSpillCounters(long rawLen, long compLength) {
+    if (!isFinalMergeEnabled()) {
+      outputBytesWithOverheadCounter.increment(rawLen);
+    } else {
+      if (numSpills > 0) {
+        additionalSpillBytesWritten.increment(compLength);
+        numAdditionalSpills.increment(1);
+        // Reset the value will be set during the final merge.
+        outputBytesWithOverheadCounter.setValue(0);
+      } else {
+        // Set this up for the first write only. Subsequent ones will be handled in the final merge.
+        outputBytesWithOverheadCounter.increment(rawLen);
+      }
+    }
+  }
+
   protected void spill(int mstart, int mend)
       throws IOException, InterruptedException {
 
@@ -879,15 +885,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
 
           // close the writer
           writer.close();
-          if (numSpills > 0) {
-            additionalSpillBytesWritten.increment(writer.getCompressedLength());
-            numAdditionalSpills.increment(1);
-            // Reset the value will be set during the final merge.
-            outputBytesWithOverheadCounter.setValue(0);
-          } else {
-            // Set this up for the first write only. Subsequent ones will be handled in the final merge.
-            outputBytesWithOverheadCounter.increment(writer.getRawLength());
-          }
+          adjustSpillCounters(writer.getRawLength(), writer.getCompressedLength());
           // record offsets
           final TezIndexRecord rec =
               new TezIndexRecord(
@@ -916,6 +914,9 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
       }
       LOG.info("Finished spill " + numSpills);
       ++numSpills;
+      if (!isFinalMergeEnabled()) {
+        numShuffleChunks.setValue(numSpills);
+      }
     } finally {
       if (out != null) out.close();
     }
@@ -956,14 +957,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
           }
           writer.close();
 
-          if (numSpills > 0) {
-            additionalSpillBytesWritten.increment(writer.getCompressedLength());
-            numAdditionalSpills.increment(1);
-            outputBytesWithOverheadCounter.setValue(0);
-          } else {
-            // Set this up for the first write only. Subsequent ones will be handled in the final merge.
-            outputBytesWithOverheadCounter.increment(writer.getRawLength());
-          }
+          adjustSpillCounters(writer.getRawLength(), writer.getCompressedLength());
 
           // record offsets
           TezIndexRecord rec =
@@ -992,6 +986,9 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
           spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
       }
       ++numSpills;
+      if (!isFinalMergeEnabled()) {
+        numShuffleChunks.setValue(numSpills);
+      }
     } finally {
       if (out != null) out.close();
     }
@@ -1085,13 +1082,13 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
 
   private void maybeSendEventForSpill(List<Event> events, boolean isLastEvent,
       TezSpillRecord spillRecord, int index, boolean sendEvent) throws IOException {
-    if (finalMergeEnabled) {
+    if (isFinalMergeEnabled()) {
       return;
     }
     Preconditions.checkArgument(spillRecord != null, "Spill record can not be null");
 
     String pathComponent = (outputContext.getUniqueIdentifier() + "_" + index);
-    ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent,
+    ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), isLastEvent,
         outputContext, index, spillRecord, partitions, sendEmptyPartitionDetails, pathComponent);
 
     LOG.info("Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + index);
@@ -1102,7 +1099,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
   }
 
   private void maybeAddEventsForSpills() throws IOException {
-    if (finalMergeEnabled) {
+    if (isFinalMergeEnabled()) {
       return;
     }
     List<Event> events = Lists.newLinkedList();
@@ -1124,7 +1121,6 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
       maybeSendEventForSpill(events, (i == numSpills - 1), spillRecord, i, false);
       fileOutputByteCounter.increment(rfs.getFileStatus(spillFilePaths.get(i)).getLen());
     }
-
     outputContext.sendEvents(events);
   }
 
@@ -1140,7 +1136,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
       finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
     }
     if (numSpills == 1) { //the spill is the final output
-      if (finalMergeEnabled) {
+      if (isFinalMergeEnabled()) {
         finalOutputFile = mapOutputFile.getOutputFileForWriteInVolume(filename[0]);
         finalIndexFile = mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]);
         sameVolRename(filename[0], finalOutputFile);
@@ -1160,6 +1156,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
         fileOutputByteCounter.increment(rfs.getFileStatus(spillFilePaths.get(0)).getLen());
         //No need to populate finalIndexFile, finalOutputFile etc when finalMerge is disabled
       }
+      numShuffleChunks.setValue(numSpills);
       return;
     }
 
@@ -1170,7 +1167,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
     }
 
     //Check if it is needed to do final merge. Or else, exit early.
-    if (numSpills > 0 && !finalMergeEnabled) {
+    if (numSpills > 0 && !isFinalMergeEnabled()) {
       maybeAddEventsForSpills();
       //No need to do final merge.
       return;
@@ -1181,7 +1178,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
     finalOutFileSize += partitions * APPROX_HEADER_LENGTH;
     finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
 
-    if (finalMergeEnabled) {
+    if (isFinalMergeEnabled()) {
       finalOutputFile = mapOutputFile.getOutputFileForWrite(finalOutFileSize);
       finalIndexFile = mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize);
     } else if (numSpills == 0) {
@@ -1219,12 +1216,13 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
       } finally {
         finalOut.close();
       }
-
-      if (!finalMergeEnabled) {
+      ++numSpills;
+      if (!isFinalMergeEnabled()) {
         List<Event> events = Lists.newLinkedList();
         maybeSendEventForSpill(events, true, sr, 0, true);
         fileOutputByteCounter.increment(rfs.getFileStatus(finalOutputFile).getLen());
       }
+      numShuffleChunks.setValue(numSpills);
       return;
     }
     else {
@@ -1277,6 +1275,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
           runCombineProcessor(kvIter, writer);
         }
         writer.close();
+        outputBytesWithOverheadCounter.increment(writer.getRawLength());
 
         // record offsets
         final TezIndexRecord rec =
@@ -1286,6 +1285,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
                 writer.getCompressedLength());
         spillRec.putIndex(rec, parts);
       }
+      numShuffleChunks.setValue(1); //final merge has happened
       spillRec.writeToFile(finalIndexFile, conf);
       finalOut.close();
       for(int i = 0; i < numSpills; i++) {

http://git-wip-us.apache.org/repos/asf/tez/blob/42b7756e/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
index 5de96c9..8bf91ce 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
@@ -13,6 +13,8 @@ import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.ExecutionContext;
@@ -60,7 +62,7 @@ import static org.mockito.internal.verification.VerificationModeFactory.times;
  * limitations under the License.
  */
 public class TestPipelinedSorter {
-  private static final Configuration conf = new Configuration();
+  private static Configuration conf = new Configuration();
   private static FileSystem localFs = null;
   private static Path workDir = null;
   private OutputContext outputContext;
@@ -114,6 +116,7 @@ public class TestPipelinedSorter {
   public void reset() throws IOException {
     cleanup();
     localFs.mkdirs(workDir);
+    conf = new Configuration();
   }
 
   @Test
@@ -124,6 +127,17 @@ public class TestPipelinedSorter {
     conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 5);
     basicTest(1, 100000, 100, (10 * 1024l * 1024l), 3 << 20);
 
+  }
+
+  @Test
+  public void testWithEmptyData() throws IOException {
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 5);
+    //# partition, # of keys, size per key, InitialMem, blockSize
+    basicTest(1, 0, 0, (10 * 1024l * 1024l), 3 << 20);
+  }
+
+  @Test
+  public void basicTestWithSmallBlockSize() throws IOException {
     try {
       //3 MB key & 3 MB value, whereas block size is just 3 MB
       basicTest(1, 5, (3 << 20), (10 * 1024l * 1024l), 3 << 20);
@@ -133,11 +147,13 @@ public class TestPipelinedSorter {
           ioe.getMessage().contains("Record too large for in-memory buffer."
               + " Exceeded buffer overflow limit"));
     }
+  }
 
+  @Test
+  public void testWithLargeKeyValue() throws IOException {
     //15 MB key & 15 MB value, 48 MB sort buffer.  block size is 48MB (or 1 block)
     //meta would be 16 MB
     basicTest(1, 5, (15 << 20), (48 * 1024l * 1024l), 48 << 20);
-
   }
 
   @Test
@@ -154,7 +170,7 @@ public class TestPipelinedSorter {
     conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 5);
     conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false);
     PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs,
-        initialAvailableMem, 1<<20);
+        initialAvailableMem, 1 << 20);
 
     //Write 100 keys each of size 10
     writeData(sorter, 10000, 100);
@@ -172,6 +188,7 @@ public class TestPipelinedSorter {
 
     writeData(sorter, numKeys, keySize);
 
+    verifyCounters(sorter, outputContext);
     Path outputFile = sorter.finalOutputFile;
     FileSystem fs = outputFile.getFileSystem(conf);
 
@@ -181,6 +198,41 @@ public class TestPipelinedSorter {
     reader.close();
   }
 
+  private void verifyCounters(PipelinedSorter sorter, OutputContext context) {
+    TezCounter numShuffleChunks = context.getCounters().findCounter(TaskCounter.SHUFFLE_CHUNK_COUNT);
+    TezCounter additionalSpills =
+        context.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILL_COUNT);
+    TezCounter additionalSpillBytesWritten =
+        context.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN);
+    TezCounter additionalSpillBytesRead =
+        context.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ);
+
+    if (sorter.isFinalMergeEnabled()) {
+      assertTrue(additionalSpills.getValue() == (sorter.getNumSpills() - 1));
+      //Number of files served by shuffle-handler
+      assertTrue(1 == numShuffleChunks.getValue());
+      if (sorter.getNumSpills() > 1) {
+        assertTrue(additionalSpillBytesRead.getValue() > 0);
+        assertTrue(additionalSpillBytesWritten.getValue() > 0);
+      }
+    } else {
+      assertTrue(0 == additionalSpills.getValue());
+      //Number of files served by shuffle-handler
+      assertTrue(sorter.getNumSpills() == numShuffleChunks.getValue());
+      assertTrue(additionalSpillBytesRead.getValue() == 0);
+      assertTrue(additionalSpillBytesWritten.getValue() == 0);
+    }
+
+    TezCounter finalOutputBytes =
+        context.getCounters().findCounter(TaskCounter.OUTPUT_BYTES_PHYSICAL);
+    assertTrue(finalOutputBytes.getValue() > 0);
+
+    TezCounter outputBytesWithOverheadCounter = context.getCounters().findCounter
+        (TaskCounter.OUTPUT_BYTES_WITH_OVERHEAD);
+    assertTrue(outputBytesWithOverheadCounter.getValue() > 0);
+  }
+
+
   @Test
   //Its not possible to allocate > 2 GB in test environment.  Carry out basic checks here.
   public void memTest() throws IOException {

http://git-wip-us.apache.org/repos/asf/tez/blob/42b7756e/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
index 70dce13..072eafc 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.runtime.api.Event;
@@ -171,6 +172,7 @@ public class TestDefaultSorter {
     try {
       writeData(sorter, 1000, 1000);
       assertTrue(sorter.getNumSpills() > 2);
+      verifyCounters(sorter, context);
     } catch(IOException ioe) {
       fail(ioe.getMessage());
     }
@@ -191,6 +193,7 @@ public class TestDefaultSorter {
       sorter.flush();
       sorter.close();
       assertTrue(sorter.getFinalOutputFile().getParent().getName().equalsIgnoreCase(UniqueID));
+      verifyCounters(sorter, context);
     } catch(Exception e) {
       fail();
     }
@@ -213,13 +216,13 @@ public class TestDefaultSorter {
       sorter.close();
       assertTrue(sorter.getFinalOutputFile().getParent().getName().equalsIgnoreCase(UniqueID +
           "_0"));
-      assertTrue(context.getCounters().findCounter(TaskCounter.OUTPUT_BYTES_PHYSICAL).getValue() > 0);
+      verifyCounters(sorter, context);
     } catch(Exception e) {
       fail();
     }
   }
 
-  @Test(timeout = 30000)
+  @Test(timeout = 60000)
   @SuppressWarnings("unchecked")
   public void testWithSingleSpillWithFinalMergeDisabled() throws IOException {
     OutputContext context = createTezOutputContext();
@@ -245,10 +248,10 @@ public class TestDefaultSorter {
       }
     }
 
-    assertTrue(context.getCounters().findCounter(TaskCounter.OUTPUT_BYTES_PHYSICAL).getValue() > 0);
+    verifyCounters(sorter, context);
   }
 
-  @Test(timeout = 30000)
+  @Test(timeout = 60000)
   @SuppressWarnings("unchecked")
   public void testWithMultipleSpillsWithFinalMergeDisabled() throws IOException {
     OutputContext context = createTezOutputContext();
@@ -277,7 +280,37 @@ public class TestDefaultSorter {
       }
     }
     assertTrue(spillIndex == spillCount);
-    assertTrue(context.getCounters().findCounter(TaskCounter.OUTPUT_BYTES_PHYSICAL).getValue() > 0);
+    verifyCounters(sorter, context);
+  }
+
+  private void verifyCounters(DefaultSorter sorter, OutputContext context) {
+    TezCounter numShuffleChunks = context.getCounters().findCounter(TaskCounter.SHUFFLE_CHUNK_COUNT);
+    TezCounter additionalSpills = context.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILL_COUNT);
+    TezCounter additionalSpillBytesWritten = context.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN);
+    TezCounter additionalSpillBytesRead = context.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ);
+
+    if (sorter.isFinalMergeEnabled()) {
+      assertTrue(additionalSpills.getValue() == (sorter.getNumSpills() - 1));
+      //Number of files served by shuffle-handler
+      assertTrue(1 == numShuffleChunks.getValue());
+      if (sorter.getNumSpills() > 1) {
+        assertTrue(additionalSpillBytesRead.getValue() > 0);
+        assertTrue(additionalSpillBytesWritten.getValue() > 0);
+      }
+    } else {
+      assertTrue(0 == additionalSpills.getValue());
+      //Number of files served by shuffle-handler
+      assertTrue(sorter.getNumSpills() == numShuffleChunks.getValue());
+      assertTrue(additionalSpillBytesRead.getValue() == 0);
+      assertTrue(additionalSpillBytesWritten.getValue() == 0);
+    }
+
+    TezCounter finalOutputBytes = context.getCounters().findCounter(TaskCounter.OUTPUT_BYTES_PHYSICAL);
+    assertTrue(finalOutputBytes.getValue() > 0);
+
+    TezCounter outputBytesWithOverheadCounter = context.getCounters().findCounter
+        (TaskCounter.OUTPUT_BYTES_WITH_OVERHEAD);
+    assertTrue(outputBytesWithOverheadCounter.getValue() > 0);
   }
 
   private void writeData(ExternalSorter sorter, int numKeys, int keyLen) throws IOException {

http://git-wip-us.apache.org/repos/asf/tez/blob/42b7756e/tez-tests/src/test/java/org/apache/tez/test/TestPipelinedShuffle.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestPipelinedShuffle.java b/tez-tests/src/test/java/org/apache/tez/test/TestPipelinedShuffle.java
index 25c149d..52342a2 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestPipelinedShuffle.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestPipelinedShuffle.java
@@ -256,7 +256,7 @@ public class TestPipelinedShuffle {
       TezCounters counters = dagStatus.getDAGCounters();
 
       //Ensure that atleast 10 spills were there in this job.
-      assertTrue(counters.findCounter(TaskCounter.ADDITIONAL_SPILL_COUNT).getValue() > 10);
+      assertTrue(counters.findCounter(TaskCounter.SHUFFLE_CHUNK_COUNT).getValue() > 10);
 
       if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
         System.out.println("DAG diagnostics: " + dagStatus.getDiagnostics());