You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/03/06 23:27:16 UTC

[2/2] git commit: TEZ-901. Improvements to Counters generated by runtime components. (sseth)

TEZ-901. Improvements to Counters generated by runtime components.
(sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/351a6105
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/351a6105
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/351a6105

Branch: refs/heads/master
Commit: 351a610589ef6f47bf00937c7f3289f4c4c1a204
Parents: 36e74ea
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Mar 6 14:26:58 2014 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Mar 6 14:26:58 2014 -0800

----------------------------------------------------------------------
 .../apache/tez/common/counters/TaskCounter.java | 151 +++++++++++++++----
 .../org/apache/tez/mapreduce/input/MRInput.java |  48 +-----
 .../apache/tez/mapreduce/output/MROutput.java   |  54 +------
 .../apache/tez/mapreduce/processor/MRTask.java  |   9 +-
 .../mapreduce/processor/map/MapProcessor.java   |  13 --
 .../processor/reduce/ReduceProcessor.java       |  11 --
 .../processor/map/TestMapProcessor.java         |   2 +-
 .../broadcast/input/BroadcastKVReader.java      |  13 +-
 .../input/BroadcastShuffleManager.java          |  44 +++++-
 .../broadcast/output/FileBasedKVWriter.java     |  27 +++-
 .../runtime/library/common/ValuesIterator.java  |   2 +
 .../common/localshuffle/LocalShuffle.java       |   2 +-
 .../library/common/shuffle/impl/Fetcher.java    |   4 +-
 .../common/shuffle/impl/InMemoryReader.java     |   2 +-
 .../common/shuffle/impl/InMemoryWriter.java     |   4 +-
 .../common/shuffle/impl/MergeManager.java       |  76 ++++++++--
 .../library/common/shuffle/impl/Shuffle.java    |  21 ++-
 .../common/shuffle/impl/ShuffleScheduler.java   |  25 ++-
 .../common/sort/impl/ExternalSorter.java        |  41 +++--
 .../runtime/library/common/sort/impl/IFile.java |  46 ++++--
 .../common/sort/impl/PipelinedSorter.java       |   8 +-
 .../library/common/sort/impl/TezMerger.java     |  64 +++++---
 .../common/sort/impl/dflt/DefaultSorter.java    |  32 +++-
 .../library/input/ShuffledMergedInput.java      |   2 +-
 .../library/input/ShuffledUnorderedKVInput.java |   6 +-
 .../library/output/OnFileSortedOutput.java      |   2 +-
 .../runtime/library/shuffle/common/Fetcher.java |   2 +-
 .../library/shuffle/common/FetcherCallback.java |   4 +-
 .../library/common/sort/impl/TestIFile.java     |   6 +-
 29 files changed, 467 insertions(+), 254 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/351a6105/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
index b6fca27..47107c3 100644
--- a/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
+++ b/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
@@ -19,48 +19,143 @@
 package org.apache.tez.common.counters;
 
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
-// TODO TEZAM5 For MR compatibility, a conversion from tez.TaskCounters to
-// mapreduce.TaskCounters will likely be required somewhere.
-// Similarly for FileSystemCounters and others.
 
 // Counters used by Task classes
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
+@InterfaceAudience.Private
+
 public enum TaskCounter {
   // TODO Eventually, rename counters to be non-MR specific and map them to MR equivalent.
-  MAP_INPUT_RECORDS, 
-  MAP_OUTPUT_RECORDS,
-  MAP_SKIPPED_RECORDS,
-  MAP_OUTPUT_BYTES,
-  MAP_OUTPUT_MATERIALIZED_BYTES,
-  SPLIT_RAW_BYTES,
-  COMBINE_INPUT_RECORDS,
-  COMBINE_OUTPUT_RECORDS,
+
+  /**
+   * Number of Input Groups seen by ShuffledMergedInput.
+   * Alternately the number of Input Groups seen by a Reduce task.
+   */
   REDUCE_INPUT_GROUPS,
-  REDUCE_SHUFFLE_BYTES,
+
+  /**
+   * Number of records (across all Groups) seen by ShuffledMergedInput
+   * Alternately number of records seen by a ReduceProcessor
+   */
   REDUCE_INPUT_RECORDS,
-  REDUCE_OUTPUT_RECORDS,
-  REDUCE_SKIPPED_GROUPS,
-  REDUCE_SKIPPED_RECORDS,
+  
+  REDUCE_OUTPUT_RECORDS, // Not used at the moment.
+  REDUCE_SKIPPED_GROUPS, // Not used at the moment.
+  REDUCE_SKIPPED_RECORDS, // Not used at the moment.
+  SPLIT_RAW_BYTES,
+  
+  COMBINE_INPUT_RECORDS,
+  COMBINE_OUTPUT_RECORDS, // Not used at the moment.
+
+  /**
+   * Number of records written to disk in case of OnFileSortedOutput.
+   * 
+   * Number of additional records writtent out to disk in case of
+   * ShuffledMergedInput; this represents the number of unnecessary spills to
+   * disk caused by lac of memory.
+   */
   SPILLED_RECORDS,
-  SHUFFLED_MAPS, 
-  FAILED_SHUFFLE,
+
+  /**
+   * Number of Inputs from which data is copied. Represents physical Inputs. 
+   */
+  NUM_SHUFFLED_INPUTS,
+
+  /**
+   * Number of failed copy attempts (physical inputs)
+   */
+  NUM_FAILED_SHUFFLE_INPUTS,
+  
   MERGED_MAP_OUTPUTS,
   GC_TIME_MILLIS,
   CPU_MILLISECONDS,
   PHYSICAL_MEMORY_BYTES,
   VIRTUAL_MEMORY_BYTES,
   COMMITTED_HEAP_BYTES,
+
+  /**
+   * Represents the number of Input Records that were actually processed.
+   * Used by MRInput and ShuffledUnorderedKVInput
+   * 
+   */
+  INPUT_RECORDS_PROCESSED,
   
-  INPUT_RECORDS, 
+  // 
+  /**
+   * Represents the number of actual output records.
+   * Used by MROutput, OnFileSortedOutput, and OnFileUnorderedKVOutput
+   */
   OUTPUT_RECORDS,
-  SKIPPED_RECORDS,
+  
+  SKIPPED_RECORDS, // Not used at the moment.
+
+  /**
+   * Represents the serialized output size (uncompressed) of data being written.
+   */
   OUTPUT_BYTES,
-  OUTPUT_MATERIALIZED_BYTES,
-  INPUT_GROUPS,
+
+  /**
+   * Represents serialized output size (uncompressed) along with any overhead
+   * added by the format being used.
+   */
+  OUTPUT_BYTES_WITH_OVERHEAD,
+
+  /**
+   * Represents the actual physical size of the Output generated. This factors
+   * in Compression if it is enabled. (Will include actual serialized output
+   * size + overhead)
+   */
+  OUTPUT_BYTES_PHYSICAL,
+  
+  /**
+   * Bytes written to disk due to unnecessary spills (lac of adequate memory).
+   * Used by OnFileSortedOutput and ShuffledMergedInput
+   */
+  ADDITIONAL_SPILLS_BYTES_WRITTEN,
+  
+  /**
+   * Bytes read from disk due to previous spills (lac of adequate memory).
+   * Used by OnFileSortedOutput and ShuffledMergedInput
+   */
+  ADDITIONAL_SPILLS_BYTES_READ,
+  
+  /**
+   * Actual number of unnecessary spills. (lac of adequate memory)
+   * Used by OnFileSortedOutput
+   */
+  ADDITIONAL_SPILL_COUNT,
+  
+  INPUT_GROUPS, // Not used at the moment. Will eventually replace REDUCE_INPUT_GROUPS
+
+  /**
+   * Amount of physical data moved over the wire. Used by Shuffled*Input. Should
+   * be a combination of SHUFFLE_BYTES_TO_MEM and SHUFFLE_BYTES_TO_DISK
+   */
   SHUFFLE_BYTES,
-  SHUFFLED_TASKS, 
-  MERGED_TASK_OUTPUTS,
-}
+
+  /**
+   * Uncompressed size of the data being processed by the relevant Shuffle.
+   * Includes serialization, file format etc overheads.
+   */
+  SHUFFLE_BYTES_DECOMPRESSED, 
+
+  /**
+   * Number of bytes which were shuffled directly to memory. 
+   */
+  SHUFFLE_BYTES_TO_MEM,
+
+  /**
+   * Number of bytes which were shuffled directly to disk 
+   */
+  SHUFFLE_BYTES_TO_DISK,
+
+  /**
+   * Number of Memory to Disk merges performed during sort-merge.
+   * Used by ShuffledMergedInput
+   */
+  NUM_MEM_TO_DISK_MERGES,
+
+  /**
+   * Number of disk to disk merges performed during the sort-merge
+   */
+  NUM_DISK_TO_DISK_MERGES,
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/351a6105/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index 34b5527..1ac0295 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileSystem.Statistics;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.serializer.Deserializer;
@@ -44,7 +43,6 @@ import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.hadoop.mapred.TaskID;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez;
@@ -52,7 +50,6 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.mapreduce.common.Utils;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.hadoop.mapred.MRReporter;
@@ -111,8 +108,7 @@ public class MRInput implements LogicalInput {
   protected TaskSplitIndex splitMetaInfo = new TaskSplitIndex();
   
   private TezCounter inputRecordCounter;
-  private TezCounter fileInputByteCounter; 
-  private List<Statistics> fsStats;
+  // Potential counters - #splits, #totalSize, #actualyBytesRead
   
   @Private
   volatile boolean splitInfoViaEvents;
@@ -148,9 +144,9 @@ public class MRInput implements LogicalInput {
     // TODO NEWTEZ Rename this to be specific to MRInput. This Input, in
     // theory, can be used by the MapProcessor, ReduceProcessor or a custom
     // processor. (The processor could provide the counter though)
-    this.inputRecordCounter = inputContext.getCounters().findCounter(TaskCounter.MAP_INPUT_RECORDS);
-    this.fileInputByteCounter = inputContext.getCounters().findCounter(FileInputFormatCounter.BYTES_READ);
-    
+
+    this.inputRecordCounter = inputContext.getCounters().findCounter(TaskCounter.INPUT_RECORDS_PROCESSED);
+
     useNewApi = this.jobConf.getUseNewMapper();
     this.splitInfoViaEvents = jobConf.getBoolean(MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS,
         MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS_DEFAULT);
@@ -206,17 +202,8 @@ public class MRInput implements LogicalInput {
   
   private void setupOldRecordReader() throws IOException {
     Preconditions.checkNotNull(oldInputSplit, "Input split hasn't yet been setup");
-    List<Statistics> matchedStats = null;
-    if (oldInputSplit instanceof FileSplit) {
-      matchedStats = Utils.getFsStatistics(((FileSplit) oldInputSplit).getPath(), this.jobConf);
-    }
-    fsStats = matchedStats;
-    
-    long bytesInPrev = getInputBytes();
     oldRecordReader = oldInputFormat.getRecordReader(oldInputSplit,
         this.jobConf, new MRReporter(inputContext, oldInputSplit));
-    long bytesInCurr = getInputBytes();
-    fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
     setIncrementalConfigParams(oldInputSplit);
   }
   
@@ -233,15 +220,7 @@ public class MRInput implements LogicalInput {
   }
   
   private void setupNewRecordReader() throws IOException {
-    Preconditions.checkNotNull(newInputSplit, "Input split hasn't yet been setup");
-    List<Statistics> matchedStats = null;
-    if (newInputSplit instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
-      matchedStats = Utils.getFsStatistics(
-          ((org.apache.hadoop.mapreduce.lib.input.FileSplit)
-              newInputSplit).getPath(), this.jobConf);
-    }
-    fsStats = matchedStats;
-    
+    Preconditions.checkNotNull(newInputSplit, "Input split hasn't yet been setup");    
     try {
       newRecordReader = newInputFormat.createRecordReader(newInputSplit, taskAttemptContext);
       newRecordReader.initialize(newInputSplit, taskAttemptContext);
@@ -295,15 +274,11 @@ public class MRInput implements LogicalInput {
 
   @Override
   public List<Event> close() throws IOException {
-    long bytesInPrev = getInputBytes();
     if (useNewApi) {
       newRecordReader.close();
     } else {
       oldRecordReader.close();
     }
-    long bytesInCurr = getInputBytes();
-    fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
-    
     return null;
   }
 
@@ -493,15 +468,6 @@ public class MRInput implements LogicalInput {
     LOG.info("Processing split: " + inputSplit);
   }
 
-  private long getInputBytes() {
-    if (fsStats == null) return 0;
-    long bytesRead = 0;
-    for (Statistics stat: fsStats) {
-      bytesRead = bytesRead + stat.getBytesRead();
-    }
-    return bytesRead;
-  }
-
   protected TaskSplitMetaInfo[] readSplits(Configuration conf)
       throws IOException {
     TaskSplitMetaInfo[] allTaskSplitMetaInfo;
@@ -533,7 +499,6 @@ public class MRInput implements LogicalInput {
     @Override
     public boolean next() throws IOException {
       boolean hasNext = false;
-      long bytesInPrev = getInputBytes();
       if (localNewApi) {
         try {
           hasNext = newRecordReader.nextKeyValue();
@@ -544,9 +509,6 @@ public class MRInput implements LogicalInput {
       } else {
         hasNext = oldRecordReader.next(key, value);
       }
-      long bytesInCurr = getInputBytes();
-      fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
-      
       if (hasNext) {
         inputRecordCounter.increment(1);
       }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/351a6105/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index 25a1e0f..2ecf602 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -28,7 +28,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileSystem.Statistics;
 import org.apache.hadoop.mapred.FileOutputCommitter;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobConf;
@@ -37,13 +36,11 @@ import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.mapreduce.common.Utils;
 import org.apache.tez.mapreduce.hadoop.MRConfig;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.hadoop.mapred.MRReporter;
@@ -77,8 +74,6 @@ public class MROutput implements LogicalOutput {
   org.apache.hadoop.mapred.RecordWriter oldRecordWriter;
 
   private TezCounter outputRecordCounter;
-  private TezCounter fileOutputByteCounter;
-  private List<Statistics> fsStats;
 
   private TaskAttemptContext newApiTaskAttemptContext;
   private org.apache.hadoop.mapred.TaskAttemptContext oldApiTaskAttemptContext;
@@ -123,10 +118,7 @@ public class MROutput implements LogicalOutput {
       }
     }
 
-    outputRecordCounter = outputContext.getCounters().findCounter(
-        TaskCounter.MAP_OUTPUT_RECORDS);
-    fileOutputByteCounter = outputContext.getCounters().findCounter(
-        FileOutputFormatCounter.BYTES_WRITTEN);
+    outputRecordCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_RECORDS);    
 
     if (useNewApi) {
       newApiTaskAttemptContext = createTaskAttemptContext(taskAttemptId);
@@ -138,26 +130,12 @@ public class MROutput implements LogicalOutput {
         throw new IOException(cnfe);
       }
 
-      List<Statistics> matchedStats = null;
-      if (newOutputFormat instanceof
-          org.apache.hadoop.mapreduce.lib.output.FileOutputFormat) {
-        matchedStats =
-            Utils.getFsStatistics(
-                org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
-                    .getOutputPath(newApiTaskAttemptContext),
-                jobConf);
-      }
-      fsStats = matchedStats;
-
-      long bytesOutPrev = getOutputBytes();
       try {
         newRecordWriter =
             newOutputFormat.getRecordWriter(newApiTaskAttemptContext);
       } catch (InterruptedException e) {
         throw new IOException("Interrupted while creating record writer", e);
       }
-      long bytesOutCurr = getOutputBytes();
-      fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
     } else {
       oldApiTaskAttemptContext =
           new org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl(
@@ -165,26 +143,12 @@ public class MROutput implements LogicalOutput {
               new MRTaskReporter(outputContext));
       oldOutputFormat = jobConf.getOutputFormat();
 
-      List<Statistics> matchedStats = null;
-      if (oldOutputFormat
-          instanceof org.apache.hadoop.mapred.FileOutputFormat) {
-        matchedStats =
-            Utils.getFsStatistics(
-                org.apache.hadoop.mapred.FileOutputFormat.getOutputPath(
-                    jobConf),
-                jobConf);
-      }
-      fsStats = matchedStats;
-
       FileSystem fs = FileSystem.get(jobConf);
       String finalName = getOutputName();
 
-      long bytesOutPrev = getOutputBytes();
       oldRecordWriter =
           oldOutputFormat.getRecordWriter(
               fs, jobConf, finalName, new MRReporter(outputContext));
-      long bytesOutCurr = getOutputBytes();
-      fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
     }
     initCommitter(jobConf, useNewApi);
 
@@ -248,15 +212,6 @@ public class MROutput implements LogicalOutput {
         isMapperOutput, null);
   }
 
-  private long getOutputBytes() {
-    if (fsStats == null) return 0;
-    long bytesWritten = 0;
-    for (Statistics stat: fsStats) {
-      bytesWritten = bytesWritten + stat.getBytesWritten();
-    }
-    return bytesWritten;
-  }
-  
   private String getOutputFileNamePrefix() {
     String prefix = jobConf.get(MRJobConfig.MROUTPUT_FILE_NAME_PREFIX);
     if (prefix == null) {
@@ -281,7 +236,6 @@ public class MROutput implements LogicalOutput {
       @SuppressWarnings("unchecked")
       @Override
       public void write(Object key, Object value) throws IOException {
-        long bytesOutPrev = getOutputBytes();
         if (useNewWriter) {
           try {
             newRecordWriter.write(key, value);
@@ -292,9 +246,6 @@ public class MROutput implements LogicalOutput {
         } else {
           oldRecordWriter.write(key, value);
         }
-
-        long bytesOutCurr = getOutputBytes();
-        fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
         outputRecordCounter.increment(1);
       }
     };
@@ -312,7 +263,6 @@ public class MROutput implements LogicalOutput {
     }
 
     LOG.info("Closing Simple Output");
-    long bytesOutPrev = getOutputBytes();
     if (useNewApi) {
       try {
         newRecordWriter.close(newApiTaskAttemptContext);
@@ -322,8 +272,6 @@ public class MROutput implements LogicalOutput {
     } else {
       oldRecordWriter.close(null);
     }
-    long bytesOutCurr = getOutputBytes();
-    fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
     LOG.info("Closed Simple Output");
     return null;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/351a6105/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
index a5fda8c..9ab64ba 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
@@ -34,8 +34,8 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.RawComparator;
@@ -57,14 +57,13 @@ import org.apache.hadoop.mapreduce.task.ReduceContextImpl;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.TezTaskStatus.State;
 import org.apache.tez.common.TezUtils;
-import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.TokenCache;
@@ -583,10 +582,6 @@ public abstract class MRTask {
     }
   }
 
-  public abstract TezCounter getOutputRecordsCounter();
-
-  public abstract TezCounter getInputRecordsCounter();
-
   public org.apache.hadoop.mapreduce.TaskAttemptContext getTaskAttemptContext() {
     return taskAttemptContext;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/351a6105/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
index b90cd11..27b52b2 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
@@ -36,8 +36,6 @@ import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.counters.TaskCounter;
-import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.mapreduce.hadoop.mapreduce.MapContextImpl;
 import org.apache.tez.mapreduce.input.MRInput;
@@ -386,15 +384,4 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor {
     super.localizeConfiguration(jobConf);
     jobConf.setBoolean(JobContext.TASK_ISMAP, true);
   }
-
-  @Override
-  public TezCounter getOutputRecordsCounter() {
-    return processorContext.getCounters().findCounter(TaskCounter.MAP_OUTPUT_RECORDS);
-  }
-
-  @Override
-  public TezCounter getInputRecordsCounter() {
-    return processorContext.getCounters().findCounter(TaskCounter.MAP_INPUT_RECORDS);
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/351a6105/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
index 41dff33..c5ade59 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
@@ -37,7 +37,6 @@ import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.common.counters.TaskCounter;
-import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.mapreduce.output.MROutputLegacy;
 import org.apache.tez.mapreduce.processor.MRTask;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
@@ -355,14 +354,4 @@ public class ReduceProcessor extends MRTask implements LogicalIOProcessor {
     jobConf.setBoolean(JobContext.TASK_ISMAP, false);
   }
 
-  @Override
-  public TezCounter getOutputRecordsCounter() {
-    return processorContext.getCounters().findCounter(TaskCounter.REDUCE_OUTPUT_RECORDS);
-  }
-
-  @Override
-  public TezCounter getInputRecordsCounter() {
-    return processorContext.getCounters().findCounter(TaskCounter.REDUCE_INPUT_GROUPS);
-  }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/351a6105/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
index 744d680..a8aec1f 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
@@ -151,7 +151,7 @@ public class TestMapProcessor {
     Path mapOutputFile = mapOutputs.getInputFile(new InputAttemptIdentifier(0, 0));
     LOG.info("mapOutputFile = " + mapOutputFile);
     IFile.Reader reader =
-        new IFile.Reader(localFs, mapOutputFile, null, null, false, 0, -1);
+        new IFile.Reader(localFs, mapOutputFile, null, null, null, false, 0, -1);
     LongWritable key = new LongWritable();
     Text value = new Text();
     DataInputBuffer keyBuf = new DataInputBuffer();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/351a6105/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
index da74ebd..2354257 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 import org.apache.tez.runtime.library.common.ConfigUtils;
 import org.apache.tez.runtime.library.common.shuffle.impl.InMemoryReader;
@@ -53,16 +54,21 @@ public class BroadcastKVReader<K, V> implements KeyValueReader {
   private final int ifileReadAheadLength;
   private final int ifileBufferSize;
   
+  private final TezCounter inputRecordCounter;
+  
   private K key;
   private V value;
   
   private FetchedInput currentFetchedInput;
   private IFile.Reader currentReader;
   
+  // TODO Remove this once per I/O counters are separated properly. Relying on
+  // the counter at the moment will generate aggregate numbers. 
   private int numRecordsRead = 0;
   
   public BroadcastKVReader(BroadcastShuffleManager shuffleManager, Configuration conf,
-      CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLength, int ifileBufferSize)
+      CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLength, int ifileBufferSize,
+      TezCounter inputRecordCounter)
       throws IOException {
     this.shuffleManager = shuffleManager;
 
@@ -70,6 +76,7 @@ public class BroadcastKVReader<K, V> implements KeyValueReader {
     this.ifileReadAhead = ifileReadAhead;
     this.ifileReadAheadLength = ifileReadAheadLength;
     this.ifileBufferSize = ifileBufferSize;
+    this.inputRecordCounter = inputRecordCounter;
 
     this.keyClass = ConfigUtils.getIntermediateInputKeyClass(conf);
     this.valClass = ConfigUtils.getIntermediateInputValueClass(conf);
@@ -98,12 +105,14 @@ public class BroadcastKVReader<K, V> implements KeyValueReader {
   @Override  
   public boolean next() throws IOException {
     if (readNextFromCurrentReader()) {
+      inputRecordCounter.increment(1);
       numRecordsRead++;
       return true;
     } else {
       boolean nextInputExists = moveToNextInput();
       while (nextInputExists) {
         if(readNextFromCurrentReader()) {
+          inputRecordCounter.increment(1);
           numRecordsRead++;
           return true;
         }
@@ -181,7 +190,7 @@ public class BroadcastKVReader<K, V> implements KeyValueReader {
           mfi.getBytes(), 0, (int) mfi.getActualSize());
     } else {
       return new IFile.Reader(fetchedInput.getInputStream(),
-          fetchedInput.getCompressedSize(), codec, null, ifileReadAhead,
+          fetchedInput.getCompressedSize(), codec, null, null, ifileReadAhead,
           ifileReadAheadLength, ifileBufferSize);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/351a6105/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
index 776f186..ca58396 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
@@ -46,6 +46,8 @@ import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.runtime.api.Event;
@@ -58,6 +60,7 @@ import org.apache.tez.runtime.library.common.InputIdentifier;
 import org.apache.tez.runtime.library.common.TezRuntimeUtils;
 import org.apache.tez.runtime.library.shuffle.common.FetchResult;
 import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
+import org.apache.tez.runtime.library.shuffle.common.FetchedInput.Type;
 import org.apache.tez.runtime.library.shuffle.common.FetchedInputAllocator;
 import org.apache.tez.runtime.library.shuffle.common.Fetcher;
 import org.apache.tez.runtime.library.shuffle.common.Fetcher.FetcherBuilder;
@@ -129,7 +132,14 @@ public class BroadcastShuffleManager implements FetcherCallback, MemoryUpdateCal
   
   private volatile long initialMemoryAvailable = -1l;
 
-  // TODO NEWTEZ Add counters.
+  private final TezCounter shuffledInputsCounter;
+  private final TezCounter failedShufflesCounter;
+  private final TezCounter bytesShuffledCounter;
+  private final TezCounter decompressedDataSizeCounter;
+  private final TezCounter bytesShuffledToDiskCounter;
+  private final TezCounter bytesShuffledToMemCounter;
+  
+  // TODO More counters - FetchErrors, speed?
   
   public BroadcastShuffleManager(TezInputContext inputContext, Configuration conf, int numInputs) throws IOException {
     this.inputContext = inputContext;
@@ -137,6 +147,13 @@ public class BroadcastShuffleManager implements FetcherCallback, MemoryUpdateCal
     this.numInputs = numInputs;
     long initalMemReq = getInitialMemoryReq();
     this.inputContext.requestInitialMemory(initalMemReq, this);
+    
+    this.shuffledInputsCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_SHUFFLED_INPUTS);
+    this.failedShufflesCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_FAILED_SHUFFLE_INPUTS);
+    this.bytesShuffledCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES);
+    this.decompressedDataSizeCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_DECOMPRESSED);
+    this.bytesShuffledToDiskCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_TO_DISK);
+    this.bytesShuffledToMemCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_TO_MEM);
   }
 
   private void configureAndStart() throws IOException {
@@ -459,9 +476,9 @@ public class BroadcastShuffleManager implements FetcherCallback, MemoryUpdateCal
   /////////////////// Methods from FetcherCallbackHandler
   
   @Override
-  public void fetchSucceeded(String host,
-      InputAttemptIdentifier srcAttemptIdentifier, FetchedInput fetchedInput, long fetchedBytes,
-      long copyDuration) throws IOException {
+  public void fetchSucceeded(String host, InputAttemptIdentifier srcAttemptIdentifier,
+      FetchedInput fetchedInput, long fetchedBytes, long decompressedLength, long copyDuration)
+      throws IOException {
     InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();    
 
     LOG.info("Completed fetch for attempt: " + srcAttemptIdentifier + " to " + fetchedInput.getType());
@@ -480,6 +497,19 @@ public class BroadcastShuffleManager implements FetcherCallback, MemoryUpdateCal
         if (!completedInputSet.contains(inputIdentifier)) {
           fetchedInput.commit();
           committed = true;
+          
+          // Processing counters for completed and commit fetches only. Need
+          // additional counters for excessive fetches - which primarily comes
+          // in after speculation or retries.
+          shuffledInputsCounter.increment(1);
+          bytesShuffledCounter.increment(fetchedBytes);
+          if (fetchedInput.getType() == Type.MEMORY) {
+            bytesShuffledToMemCounter.increment(fetchedBytes);
+          } else {
+            bytesShuffledToDiskCounter.increment(fetchedBytes);
+          }
+          decompressedDataSizeCounter.increment(decompressedLength);
+
           registerCompletedInput(fetchedInput);
         }
       }
@@ -506,6 +536,7 @@ public class BroadcastShuffleManager implements FetcherCallback, MemoryUpdateCal
     LOG.info("Fetch failed for src: " + srcAttemptIdentifier
         + "InputIdentifier: " + srcAttemptIdentifier + ", connectFailed: "
         + connectFailed);
+    failedShufflesCounter.increment(1);
     if (srcAttemptIdentifier == null) {
       String message = "Received fetchFailure for an unknown src (null)";
       LOG.fatal(message);
@@ -602,8 +633,9 @@ public class BroadcastShuffleManager implements FetcherCallback, MemoryUpdateCal
   /////////////////// End of methods for walking the available inputs
 
   @SuppressWarnings("rawtypes")
-  public BroadcastKVReader createReader() throws IOException {
-    return new BroadcastKVReader(this, conf, codec, ifileReadAhead, ifileReadAheadLength, ifileBufferSize);
+  public BroadcastKVReader createReader(TezCounter inputRecordCounter) throws IOException {
+    return new BroadcastKVReader(this, conf, codec, ifileReadAhead, ifileReadAheadLength,
+        ifileBufferSize, inputRecordCounter);
   }
   
   /**

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/351a6105/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
index 7071b87..c5e6cc1 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
@@ -31,6 +31,8 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.runtime.api.TezOutputContext;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
 import org.apache.tez.runtime.library.common.ConfigUtils;
@@ -65,6 +67,16 @@ public class FileBasedKVWriter implements KeyValueWriter {
   private TezTaskOutput ouputFileManager;
   private boolean closed = false;
 
+  // Number of output key-value pairs
+  private final TezCounter outputRecordsCounter;
+  // Number of bytes of actual output - uncompressed.
+  private final TezCounter outputBytesCounter;
+  // Size of the data with additional meta-data
+  private final TezCounter outputBytesCounterWithOverhead;
+  // Actual physical size of the data on disk.
+  private final TezCounter outputMaterializedBytesCounter;
+  
+  
   // TODO NEWTEZ Define Counters
   // Number of records
   // Time waiting for a write to complete, if that's possible.
@@ -73,6 +85,11 @@ public class FileBasedKVWriter implements KeyValueWriter {
   public FileBasedKVWriter(TezOutputContext outputContext, Configuration conf) throws IOException {
     this.conf = conf;
 
+    this.outputRecordsCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_RECORDS);
+    this.outputBytesCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES);
+    this.outputBytesCounterWithOverhead = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES_WITH_OVERHEAD);
+    this.outputMaterializedBytesCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES_PHYSICAL);
+
     this.rfs = ((LocalFileSystem) FileSystem.getLocal(this.conf)).getRaw();
 
     // Setup serialization
@@ -103,8 +120,11 @@ public class FileBasedKVWriter implements KeyValueWriter {
   public boolean close() throws IOException {
     this.closed = true;
     this.writer.close();
-    TezIndexRecord rec = new TezIndexRecord(0, writer.getRawLength(),
-        writer.getCompressedLength());
+    long rawLen = writer.getRawLength();
+    long compLen = writer.getCompressedLength();
+    outputBytesCounterWithOverhead.increment(rawLen);
+    outputMaterializedBytesCounter.increment(compLen);
+    TezIndexRecord rec = new TezIndexRecord(0, rawLen, compLen);
     TezSpillRecord sr = new TezSpillRecord(1);
     sr.putIndex(rec, 0);
 
@@ -118,6 +138,7 @@ public class FileBasedKVWriter implements KeyValueWriter {
   @Override
   public void write(Object key, Object value) throws IOException {
     this.writer.append(key, value);
+    this.outputRecordsCounter.increment(1);
     numRecords++;
   }
 
@@ -131,7 +152,7 @@ public class FileBasedKVWriter implements KeyValueWriter {
 
     // TODO NEWTEZ maybe use appropriate counter
     this.writer = new IFile.Writer(conf, rfs, outputPath, keyClass, valClass,
-        codec, null);
+        codec, null, outputBytesCounter);
   }
   
   public long getRawLength() {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/351a6105/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
index fef3356..ab1cf7f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
@@ -39,6 +39,7 @@ import com.google.common.base.Preconditions;
  * lead to corrupt data.
  * 
  */
+
 public class ValuesIterator<KEY,VALUE> {
   protected TezRawKeyValueIterator in; //input iterator
   private KEY key;               // current key
@@ -176,6 +177,7 @@ public class ValuesIterator<KEY,VALUE> {
       DataInputBuffer nextKeyBytes = in.getKey();
       keyIn.reset(nextKeyBytes.getData(), nextKeyBytes.getPosition(), nextKeyBytes.getLength());
       nextKey = keyDeserializer.deserialize(nextKey);
+      // TODO Is a counter increment required here ?
       hasMoreValues = key != null && (comparator.compare(key, nextKey) == 0);
     } else {
       hasMoreValues = false;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/351a6105/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/localshuffle/LocalShuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/localshuffle/LocalShuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/localshuffle/LocalShuffle.java
index 36723b0..fe05410 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/localshuffle/LocalShuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/localshuffle/LocalShuffle.java
@@ -121,7 +121,7 @@ public class LocalShuffle {
         sortFactor,
         new Path(inputContext.getUniqueIdentifier()), // TODO NEWTEZ This is likely broken 
         comparator,
-        null, spilledRecordsCounter, null, null);
+        null, spilledRecordsCounter, null, null, null);
   }
   
   private Path[] getMapFiles() 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/351a6105/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
index 4fd9b53..97b57f3 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
@@ -372,6 +372,7 @@ class Fetcher extends Thread {
       //Read the shuffle header
       try {
         ShuffleHeader header = new ShuffleHeader();
+        // TODO Review: Multiple header reads in case of status WAIT ? 
         header.readFields(input);
         if (!header.mapId.startsWith(InputAttemptIdentifier.PATH_PREFIX)) {
           throw new IllegalArgumentException(
@@ -412,6 +413,7 @@ class Fetcher extends Thread {
       
       // Check if we can shuffle *now* ...
       if (mapOutput.getType() == Type.WAIT) {
+        // TODO Review: Does this cause a tight loop ?
         LOG.info("fetcher#" + id + " - MergerManager returned Status.WAIT ...");
         //Not an error but wait to process data.
         return EMPTY_ATTEMPT_ID_ARRAY;
@@ -431,7 +433,7 @@ class Fetcher extends Thread {
       
       // Inform the shuffle scheduler
       long endTime = System.currentTimeMillis();
-      scheduler.copySucceeded(srcAttemptId, host, compressedLength, 
+      scheduler.copySucceeded(srcAttemptId, host, compressedLength, decompressedLength, 
                               endTime - startTime, mapOutput);
       // Note successful shuffle
       remaining.remove(srcAttemptId);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/351a6105/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/InMemoryReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/InMemoryReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/InMemoryReader.java
index 479d704..9ed90d6 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/InMemoryReader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/InMemoryReader.java
@@ -45,7 +45,7 @@ public class InMemoryReader extends Reader {
   public InMemoryReader(MergeManager merger, InputAttemptIdentifier taskAttemptId,
                         byte[] data, int start, int length)
   throws IOException {
-    super(null, length - start, null, null, false, 0, -1);
+    super(null, length - start, null,null, null, false, 0, -1);
     this.merger = merger;
     this.taskAttemptId = taskAttemptId;
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/351a6105/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/InMemoryWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/InMemoryWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/InMemoryWriter.java
index f81b28e..a9a86ff 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/InMemoryWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/InMemoryWriter.java
@@ -38,8 +38,10 @@ public class InMemoryWriter extends Writer {
 
   private DataOutputStream out;
 
+  // TODO Verify and fix counters if required.
+  
   public InMemoryWriter(BoundedByteArrayOutputStream arrayStream) {
-    super(null);
+    super(null, null);
     this.out =
       new DataOutputStream(new IFileOutputStream(arrayStream));
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/351a6105/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
index 2fb6b08..b9c5fba 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.runtime.api.TezInputContext;
@@ -114,6 +115,11 @@ public class MergeManager {
 
   private final TezCounter mergedMapOutputsCounter;
   
+  private final TezCounter numMemToDiskMerges;
+  private final TezCounter numDiskToDiskMerges;
+  private final TezCounter additionalBytesWritten;
+  private final TezCounter additionalBytesRead;
+  
   private CompressionCodec codec;
   
   private volatile boolean finalMergeComplete = false;
@@ -149,6 +155,11 @@ public class MergeManager {
     
     this.localFS = localFS;
     this.rfs = ((LocalFileSystem)localFS).getRaw();
+    
+    this.numDiskToDiskMerges = inputContext.getCounters().findCounter(TaskCounter.NUM_DISK_TO_DISK_MERGES);
+    this.numMemToDiskMerges = inputContext.getCounters().findCounter(TaskCounter.NUM_MEM_TO_DISK_MERGES);
+    this.additionalBytesWritten = inputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN);
+    this.additionalBytesRead = inputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ);
 
   }
   
@@ -393,6 +404,8 @@ public class MergeManager {
 
     synchronized (inMemoryMerger) {
       // Can hang if mergeThreshold is really low.
+      // TODO Can avoid spilling in case total input size is between
+      // mergeTghreshold and total available size.
       if (!inMemoryMerger.isInProgress() && commitMemory >= mergeThreshold) {
         LOG.info("Starting inMemoryMerger's merge since commitMemory=" +
             commitMemory + " > mergeThreshold=" + mergeThreshold + 
@@ -402,7 +415,8 @@ public class MergeManager {
         inMemoryMerger.startMerge(inMemoryMapOutputs);
       } 
     }
-    
+
+    // This should likely run a Combiner.
     if (memToMemMerger != null) {
       synchronized (memToMemMerger) {
         if (!memToMemMerger.isInProgress() && 
@@ -466,6 +480,9 @@ public class MergeManager {
     combiner.combine(kvIter, writer);
   }
 
+  /**
+   * Merges multiple in-memory segment to another in-memory segment
+   */
   private class IntermediateMemoryToMemoryMerger 
   extends MergeThread<MapOutput> {
     
@@ -494,10 +511,13 @@ public class MergeManager {
       
       Writer writer = 
         new InMemoryWriter(mergedMapOutputs.getArrayStream());
-      
+
       LOG.info("Initiating Memory-to-Memory merge with " + noInMemorySegments +
                " segments of total-size: " + mergeOutputSize);
 
+      // Nothing will be materialized to disk because the sort factor is being
+      // set to the number of in memory segments.
+      // TODO Is this doing any combination ?
       TezRawKeyValueIterator rIter = 
         TezMerger.merge(conf, rfs,
                        ConfigUtils.getIntermediateInputKeyClass(conf),
@@ -505,7 +525,7 @@ public class MergeManager {
                        inMemorySegments, inMemorySegments.size(),
                        new Path(inputContext.getUniqueIdentifier()),
                        (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
-                       nullProgressable, null, null, null);
+                       nullProgressable, null, null, null, null); 
       TezMerger.writeFile(rIter, writer, nullProgressable, TezJobConfig.DEFAULT_RECORDS_BEFORE_PROGRESS);
       writer.close();
 
@@ -518,6 +538,9 @@ public class MergeManager {
     }
   }
   
+  /**
+   * Merges multiple in-memory segment to a disk segment
+   */
   private class InMemoryMerger extends MergeThread<MapOutput> {
     
     public InMemoryMerger(MergeManager manager) {
@@ -533,6 +556,8 @@ public class MergeManager {
         return;
       }
       
+      numMemToDiskMerges.increment(1);
+      
       //name this output file same as the name of the first file that is 
       //there in the current list of inmem files (this is guaranteed to
       //be absent on the disk currently. So we don't overwrite a prev. 
@@ -549,6 +574,10 @@ public class MergeManager {
         createInMemorySegments(inputs, inMemorySegments,0);
       int noInMemorySegments = inMemorySegments.size();
 
+      // TODO Maybe track serialized vs deserialized bytes.
+      
+      // All disk writes done by this merge are overhead - due to the lac of
+      // adequate memory to keep all segments in memory.
       Path outputPath = mapOutputFile.getInputFileForWrite(
           srcTaskIdentifier.getInputIdentifier().getInputIndex(),
           mergeOutputSize).suffix(Constants.MERGED_OUTPUT_PREFIX);
@@ -559,26 +588,32 @@ public class MergeManager {
             new Writer(conf, rfs, outputPath,
                 (Class)ConfigUtils.getIntermediateInputKeyClass(conf),
                 (Class)ConfigUtils.getIntermediateInputValueClass(conf),
-                codec, null);
+                codec, null, null);
 
         TezRawKeyValueIterator rIter = null;
         LOG.info("Initiating in-memory merge with " + noInMemorySegments + 
             " segments...");
 
+        // Nothing actually materialized to disk - controlled by setting sort-factor to #segments.
         rIter = TezMerger.merge(conf, rfs,
             (Class)ConfigUtils.getIntermediateInputKeyClass(conf),
             (Class)ConfigUtils.getIntermediateInputValueClass(conf),
             inMemorySegments, inMemorySegments.size(),
             new Path(inputContext.getUniqueIdentifier()),
             (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
-            nullProgressable, spilledRecordsCounter, null, null);
+            nullProgressable, spilledRecordsCounter, null, additionalBytesRead, null);
+        // spilledRecordsCounter is tracking the number of keys that will be
+        // read from each of the segments being merged - which is essentially
+        // what will be written to disk.
 
         if (null == combiner) {
           TezMerger.writeFile(rIter, writer, nullProgressable, TezJobConfig.DEFAULT_RECORDS_BEFORE_PROGRESS);
         } else {
+          // TODO Counters for Combine
           runCombineProcessor(rIter, writer);
         }
         writer.close();
+        additionalBytesWritten.increment(writer.getCompressedLength());
         writer = null;
 
         LOG.info(inputContext.getUniqueIdentifier() +  
@@ -602,7 +637,10 @@ public class MergeManager {
     }
 
   }
-  
+
+  /**
+   * Merges multiple on-disk segments
+   */
   private class OnDiskMerger extends MergeThread<Path> {
     
     public OnDiskMerger(MergeManager manager) {
@@ -618,6 +656,7 @@ public class MergeManager {
         LOG.info("No ondisk files to merge...");
         return;
       }
+      numDiskToDiskMerges.increment(1);
       
       long approxOutputSize = 0;
       int bytesPerSum = 
@@ -643,7 +682,7 @@ public class MergeManager {
         new Writer(conf, rfs, outputPath, 
                         (Class)ConfigUtils.getIntermediateInputKeyClass(conf), 
                         (Class)ConfigUtils.getIntermediateInputValueClass(conf),
-                        codec, null);
+                        codec, null, null);
       TezRawKeyValueIterator iter  = null;
       Path tmpDir = new Path(inputContext.getUniqueIdentifier());
       try {
@@ -656,8 +695,12 @@ public class MergeManager {
                             nullProgressable, spilledRecordsCounter, null, 
                             mergedMapOutputsCounter, null);
 
+        // TODO Maybe differentiate between data written because of Merges and
+        // the finalMerge (i.e. final mem available may be different from
+        // initial merge mem)
         TezMerger.writeFile(iter, writer, nullProgressable, TezJobConfig.DEFAULT_RECORDS_BEFORE_PROGRESS);
         writer.close();
+        additionalBytesWritten.increment(writer.getCompressedLength());
       } catch (IOException e) {
         localFS.delete(outputPath, true);
         throw e;
@@ -707,7 +750,7 @@ public class MergeManager {
 
     public RawKVIteratorReader(TezRawKeyValueIterator kvIter, long size)
         throws IOException {
-      super(null, size, null, spilledRecordsCounter, ifileReadAhead,
+      super(null, size, null, spilledRecordsCounter, null, ifileReadAhead,
           ifileReadAheadLength, ifileBufferSize);
       this.kvIter = kvIter;
     }
@@ -782,11 +825,11 @@ public class MergeManager {
           mapOutputFile.getInputFileForWrite(srcTaskId,
                                              inMemToDiskBytes).suffix(
                                                  Constants.MERGED_OUTPUT_PREFIX);
-        final TezRawKeyValueIterator rIter = TezMerger.merge(job, fs,
-            keyClass, valueClass, memDiskSegments, numMemDiskSegments,
-            tmpDir, comparator, nullProgressable, spilledRecordsCounter, null, null);
+        final TezRawKeyValueIterator rIter = TezMerger.merge(job, fs, keyClass, valueClass,
+            memDiskSegments, numMemDiskSegments, tmpDir, comparator, nullProgressable,
+            spilledRecordsCounter, null, additionalBytesRead, null);
         final Writer writer = new Writer(job, fs, outputPath,
-            keyClass, valueClass, codec, null);
+            keyClass, valueClass, codec, null, null);
         try {
           TezMerger.writeFile(rIter, writer, nullProgressable, TezJobConfig.DEFAULT_RECORDS_BEFORE_PROGRESS);
           // add to list of final disk outputs.
@@ -803,6 +846,7 @@ public class MergeManager {
         } finally {
           if (null != writer) {
             writer.close();
+            additionalBytesWritten.increment(writer.getCompressedLength());
           }
         }
         LOG.info("Merged " + numMemDiskSegments + " segments, " +
@@ -856,18 +900,18 @@ public class MergeManager {
       TezRawKeyValueIterator diskMerge = TezMerger.merge(
           job, fs, keyClass, valueClass, diskSegments,
           ioSortFactor, numInMemSegments, tmpDir, comparator,
-          nullProgressable, false, spilledRecordsCounter, null, null);
+          nullProgressable, false, spilledRecordsCounter, null, additionalBytesRead, null);
       diskSegments.clear();
       if (0 == finalSegments.size()) {
         return diskMerge;
       }
       finalSegments.add(new Segment(
             new RawKVIteratorReader(diskMerge, onDiskBytes), true));
-    }
+    } 
+    // This is doing nothing but creating an iterator over the segments.
     return TezMerger.merge(job, fs, keyClass, valueClass,
                  finalSegments, finalSegments.size(), tmpDir,
                  comparator, nullProgressable, spilledRecordsCounter, null,
-                 null);
-  
+                 additionalBytesRead, null);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/351a6105/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
index 8653b44..583e1a1 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
@@ -129,18 +129,24 @@ public class Shuffle implements ExceptionReporter, MemoryUpdateCallback {
         new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
 
     // TODO TEZ Get rid of Map / Reduce references.
-    TezCounter shuffledMapsCounter = 
-        inputContext.getCounters().findCounter(TaskCounter.SHUFFLED_MAPS);
+    TezCounter shuffledInputsCounter = 
+        inputContext.getCounters().findCounter(TaskCounter.NUM_SHUFFLED_INPUTS);
     TezCounter reduceShuffleBytes =
-        inputContext.getCounters().findCounter(TaskCounter.REDUCE_SHUFFLE_BYTES);
+        inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES);
+    TezCounter reduceDataSizeDecompressed = inputContext.getCounters().findCounter(
+        TaskCounter.SHUFFLE_BYTES_DECOMPRESSED);
     TezCounter failedShuffleCounter =
-        inputContext.getCounters().findCounter(TaskCounter.FAILED_SHUFFLE);
+        inputContext.getCounters().findCounter(TaskCounter.NUM_FAILED_SHUFFLE_INPUTS);
     TezCounter spilledRecordsCounter = 
         inputContext.getCounters().findCounter(TaskCounter.SPILLED_RECORDS);
     TezCounter reduceCombineInputCounter =
         inputContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS);
     TezCounter mergedMapOutputsCounter =
         inputContext.getCounters().findCounter(TaskCounter.MERGED_MAP_OUTPUTS);
+    TezCounter bytesShuffedToDisk = inputContext.getCounters().findCounter(
+        TaskCounter.SHUFFLE_BYTES_TO_DISK);
+    TezCounter bytesShuffedToMem = inputContext.getCounters().findCounter(
+        TaskCounter.SHUFFLE_BYTES_TO_MEM);
     
     LOG.info("Shuffle assigned with " + numInputs + " inputs" + ", codec: "
         + (codec == null ? "None" : codec.getClass().getName()) + 
@@ -151,9 +157,12 @@ public class Shuffle implements ExceptionReporter, MemoryUpdateCallback {
           this.conf,
           this.numInputs,
           this,
-          shuffledMapsCounter,
+          shuffledInputsCounter,
           reduceShuffleBytes,
-          failedShuffleCounter);
+          reduceDataSizeDecompressed,
+          failedShuffleCounter,
+          bytesShuffedToDisk,
+          bytesShuffedToMem);
     eventHandler= new ShuffleInputEventHandler(
           inputContext,
           scheduler);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/351a6105/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
index 9106f95..b33b838 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
@@ -45,6 +45,7 @@ import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.TezRuntimeUtils;
+import org.apache.tez.runtime.library.common.shuffle.impl.MapOutput.Type;
 
 import com.google.common.collect.Lists;
 
@@ -82,7 +83,10 @@ class ShuffleScheduler {
   private final int abortFailureLimit;
   private final TezCounter shuffledMapsCounter;
   private final TezCounter reduceShuffleBytes;
+  private final TezCounter reduceBytesDecompressed;
   private final TezCounter failedShuffleCounter;
+  private final TezCounter bytesShuffledToDisk;
+  private final TezCounter bytesShuffledToMem;
   
   private final long startTime;
   private long lastProgressTime;
@@ -102,7 +106,10 @@ class ShuffleScheduler {
                           Shuffle shuffle,
                           TezCounter shuffledMapsCounter,
                           TezCounter reduceShuffleBytes,
-                          TezCounter failedShuffleCounter) {
+                          TezCounter reduceBytesDecompressed,
+                          TezCounter failedShuffleCounter,
+                          TezCounter bytesShuffledToDisk,
+                          TezCounter bytesShuffledToMem) {
     this.inputContext = inputContext;
     this.numInputs = numberOfInputs;
     abortFailureLimit = Math.max(30, numberOfInputs / 10);
@@ -111,7 +118,10 @@ class ShuffleScheduler {
     this.shuffle = shuffle;
     this.shuffledMapsCounter = shuffledMapsCounter;
     this.reduceShuffleBytes = reduceShuffleBytes;
+    this.reduceBytesDecompressed = reduceBytesDecompressed;
     this.failedShuffleCounter = failedShuffleCounter;
+    this.bytesShuffledToDisk = bytesShuffledToDisk;
+    this.bytesShuffledToMem = bytesShuffledToMem;
     this.startTime = System.currentTimeMillis();
     this.lastProgressTime = startTime;
     this.maxFailedUniqueFetches = Math.min(numberOfInputs,
@@ -129,7 +139,8 @@ class ShuffleScheduler {
 
   public synchronized void copySucceeded(InputAttemptIdentifier srcAttemptIdentifier, 
                                          MapHost host,
-                                         long bytes,
+                                         long bytesCompressed,
+                                         long bytesDecompressed,
                                          long milis,
                                          MapOutput output
                                          ) throws IOException {
@@ -147,9 +158,15 @@ class ShuffleScheduler {
 
       // update the status
       lastProgressTime = System.currentTimeMillis();
-      totalBytesShuffledTillNow += bytes;
+      totalBytesShuffledTillNow += bytesCompressed;
       logProgress();
-      reduceShuffleBytes.increment(bytes);
+      reduceShuffleBytes.increment(bytesCompressed);
+      reduceBytesDecompressed.increment(bytesDecompressed);
+      if (output.getType() == Type.DISK) {
+        bytesShuffledToDisk.increment(bytesCompressed);
+      } else {
+        bytesShuffledToMem.increment(bytesCompressed);
+      }
       if (LOG.isDebugEnabled()) {
         LOG.debug("src task: "
             + TezRuntimeUtils.getTaskAttemptIdentifier(

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/351a6105/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
index 3e3b25f..ab8a869 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
@@ -94,11 +94,31 @@ public abstract class ExternalSorter implements MemoryUpdateCallback {
   protected CompressionCodec codec;
 
   // Counters
-  // TODO TEZ Rename all counter variables [Mapping of counter to MR for compatibility in the MR layer]
+  // MR compatilbity layer needs to rename counters back to what MR requries.
+
+  // Represents final deserialized size of output (spills are not counted)
   protected TezCounter mapOutputByteCounter;
+  // Represents final number of records written (spills are not counted)
   protected TezCounter mapOutputRecordCounter;
+  // Represents the size of the final output - with any overheads introduced by
+  // the storage/serialization mechanism. This is an uncompressed data size.
+  protected TezCounter outputBytesWithOverheadCounter;
+  // Represents the size of the final output - which will be transmitted over
+  // the wire (spills are not counted). Factors in compression if it is enabled.
   protected TezCounter fileOutputByteCounter;
+  // Represents total number of records written to disk (includes spills. Min
+  // value for this is equal to number of output records)
   protected TezCounter spilledRecordsCounter;
+  // Bytes written as a result of additional spills. The single spill for the
+  // final output data is not considered. (This will be 0 if there's no
+  // additional spills. Compressed size - so may not represent the size in the
+  // sort buffer)
+  protected TezCounter additionalSpillBytesWritten;
+  
+  protected TezCounter additionalSpillBytesRead;
+  // Number of additional spills. (This will be 0 if there's no additional
+  // spills)
+  protected TezCounter numAdditionalSpills;
 
   @Private
   public void initialize(TezOutputContext outputContext, Configuration conf, int numOutputs) throws IOException {
@@ -131,15 +151,16 @@ public abstract class ExternalSorter implements MemoryUpdateCallback {
     keySerializer = serializationFactory.getSerializer(keyClass);
     valSerializer = serializationFactory.getSerializer(valClass);
 
-    //    counters
-    mapOutputByteCounter =
-        outputContext.getCounters().findCounter(TaskCounter.MAP_OUTPUT_BYTES);
-    mapOutputRecordCounter =
-        outputContext.getCounters().findCounter(TaskCounter.MAP_OUTPUT_RECORDS);
-    fileOutputByteCounter =
-        outputContext.getCounters().findCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES);
-    spilledRecordsCounter =
-        outputContext.getCounters().findCounter(TaskCounter.SPILLED_RECORDS);
+    //    counters    
+    mapOutputByteCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES);
+    mapOutputRecordCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_RECORDS);
+    outputBytesWithOverheadCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES_WITH_OVERHEAD);
+    fileOutputByteCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES_PHYSICAL);
+    spilledRecordsCounter = outputContext.getCounters().findCounter(TaskCounter.SPILLED_RECORDS);
+    additionalSpillBytesWritten = outputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN);
+    additionalSpillBytesRead = outputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ);
+    numAdditionalSpills = outputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILL_COUNT);
+
     // compression
     if (ConfigUtils.shouldCompressIntermediateOutput(this.conf)) {
       Class<? extends CompressionCodec> codecClass =

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/351a6105/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
index bdafdd0..e7545fc 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
@@ -85,6 +85,7 @@ public class IFile {
     // Count records written to disk
     private long numRecordsWritten = 0;
     private final TezCounter writtenRecordsCounter;
+    private final TezCounter serializedUncompressedBytes;
 
     IFileOutputStream checksumOut;
 
@@ -102,21 +103,24 @@ public class IFile {
     public Writer(Configuration conf, FileSystem fs, Path file, 
                   Class keyClass, Class valueClass,
                   CompressionCodec codec,
-                  TezCounter writesCounter) throws IOException {
+                  TezCounter writesCounter,
+                  TezCounter serializedBytesCounter) throws IOException {
       this(conf, fs.create(file), keyClass, valueClass, codec,
-           writesCounter);
+           writesCounter, serializedBytesCounter);
       ownOutputStream = true;
     }
     
-    protected Writer(TezCounter writesCounter) {
+    protected Writer(TezCounter writesCounter, TezCounter serializedBytesCounter) {
       writtenRecordsCounter = writesCounter;
+      serializedUncompressedBytes = serializedBytesCounter;
     }
 
     public Writer(Configuration conf, FSDataOutputStream out, 
         Class keyClass, Class valueClass,
-        CompressionCodec codec, TezCounter writesCounter)
+        CompressionCodec codec, TezCounter writesCounter, TezCounter serializedBytesCounter)
         throws IOException {
       this.writtenRecordsCounter = writesCounter;
+      this.serializedUncompressedBytes = serializedBytesCounter;
       this.checksumOut = new IFileOutputStream(out);
       this.rawOut = out;
       this.start = this.rawOut.getPos();
@@ -150,7 +154,7 @@ public class IFile {
 
     public Writer(Configuration conf, FileSystem fs, Path file) 
     throws IOException {
-      this(conf, fs, file, null, null, null, null);
+      this(conf, fs, file, null, null, null, null, null);
     }
 
     public void close() throws IOException {
@@ -237,7 +241,7 @@ public class IFile {
                               valueLength + " for " + value);
       }
       
-      if(rle && sameKey) {        
+      if(rle && sameKey) {
         WritableUtils.writeVInt(out, RLE_MARKER);                   // Same key as previous
         WritableUtils.writeVInt(out, valueLength);                  // value length
         out.write(buffer.getData(), keyLength, buffer.getLength()); // only the value
@@ -245,6 +249,9 @@ public class IFile {
         decompressedBytesWritten += 0 + valueLength + 
                                     WritableUtils.getVIntSize(RLE_MARKER) + 
                                     WritableUtils.getVIntSize(valueLength);
+        if (serializedUncompressedBytes != null) {
+          serializedUncompressedBytes.increment(0 + valueLength);
+        }
       } else {        
         // Write the record out        
         WritableUtils.writeVInt(out, keyLength);                  // key length
@@ -254,6 +261,9 @@ public class IFile {
         decompressedBytesWritten += keyLength + valueLength + 
                                     WritableUtils.getVIntSize(keyLength) + 
                                     WritableUtils.getVIntSize(valueLength);
+        if (serializedUncompressedBytes != null) {
+          serializedUncompressedBytes.increment(keyLength + valueLength);
+        }
       }
 
       // Reset
@@ -292,6 +302,9 @@ public class IFile {
         decompressedBytesWritten += 0 + valueLength
             + WritableUtils.getVIntSize(RLE_MARKER)
             + WritableUtils.getVIntSize(valueLength);
+        if (serializedUncompressedBytes != null) {
+          serializedUncompressedBytes.increment(0 + valueLength);
+        }
       } else {
         WritableUtils.writeVInt(out, keyLength);
         WritableUtils.writeVInt(out, valueLength);
@@ -302,6 +315,9 @@ public class IFile {
         decompressedBytesWritten += keyLength + valueLength
             + WritableUtils.getVIntSize(keyLength)
             + WritableUtils.getVIntSize(valueLength);
+        if (serializedUncompressedBytes != null) {
+          serializedUncompressedBytes.increment(keyLength + valueLength);
+        }
                 
         BufferUtils.copy(key, previous);        
       }
@@ -348,6 +364,7 @@ public class IFile {
     // Count records read from disk
     private long numRecordsRead = 0;
     private final TezCounter readRecordsCounter;
+    private final TezCounter bytesReadCounter;
 
     final InputStream in;        // Possibly decompressed stream that we read
     Decompressor decompressor;
@@ -366,6 +383,8 @@ public class IFile {
     protected int currentValueLength;
     byte keyBytes[] = new byte[0];
     
+    long startPos;
+    
     
     /**
      * Construct an IFile Reader.
@@ -379,11 +398,11 @@ public class IFile {
      */
     public Reader(FileSystem fs, Path file,
                   CompressionCodec codec,
-                  TezCounter readsCounter, boolean ifileReadAhead,
+                  TezCounter readsCounter, TezCounter bytesReadCounter, boolean ifileReadAhead,
                   int ifileReadAheadLength, int bufferSize) throws IOException {
       this(fs.open(file), 
            fs.getFileStatus(file).getLen(),
-           codec, readsCounter, ifileReadAhead, ifileReadAheadLength, bufferSize);
+           codec, readsCounter, bytesReadCounter, ifileReadAhead, ifileReadAheadLength, bufferSize);
     }
 
     /**
@@ -398,10 +417,11 @@ public class IFile {
      */
     public Reader(InputStream in, long length, 
                   CompressionCodec codec,
-                  TezCounter readsCounter,
+                  TezCounter readsCounter, TezCounter bytesReadCounter,
                   boolean readAhead, int readAheadLength,
                   int bufferSize) throws IOException {
       readRecordsCounter = readsCounter;
+      this.bytesReadCounter = bytesReadCounter;
       checksumIn = new IFileInputStream(in,length, readAhead, readAheadLength);
       if (codec != null) {
         decompressor = CodecPool.getDecompressor(codec);
@@ -417,6 +437,8 @@ public class IFile {
       this.dataIn = new DataInputStream(this.in);
       this.fileLength = length;
       
+      startPos = checksumIn.getPosition();
+      
       if (bufferSize != -1) {
         this.bufferSize = bufferSize;
       }
@@ -537,7 +559,7 @@ public class IFile {
     public void close() throws IOException {
       // Close the underlying stream
       in.close();
-      
+
       // Release the buffer
       dataIn = null;
       buffer = null;
@@ -545,6 +567,10 @@ public class IFile {
         readRecordsCounter.increment(numRecordsRead);
       }
 
+      if (bytesReadCounter != null) {
+        bytesReadCounter.increment(checksumIn.getPosition() - startPos + checksumIn.getSize());
+      }
+      
       // Return the decompressor
       if (decompressor != null) {
         decompressor.reset();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/351a6105/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index 6bfa098..e6d7d31 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -90,6 +90,8 @@ public class PipelinedSorter extends ExternalSorter {
   private int totalIndexCacheMemory;
   private int indexCacheMemoryLimit;
 
+  // TODO Set additional countesr - total bytes written, spills etc.
+  
   @Override
   public void start() throws IOException {
     
@@ -266,7 +268,7 @@ public class PipelinedSorter extends ExternalSorter {
         long segmentStart = out.getPos();
         Writer writer =
           new Writer(conf, out, keyClass, valClass, codec,
-              spilledRecordsCounter);
+              spilledRecordsCounter, null);
         writer.setRLE(merger.needsRLE());
         if (combiner == null) {
           while(kvIter.next()) {
@@ -370,14 +372,14 @@ public class PipelinedSorter extends ExternalSorter {
                      new Path(uniqueIdentifier),
                      (RawComparator)ConfigUtils.getIntermediateOutputKeyComparator(conf), 
                      nullProgressable, sortSegments,
-                     null, spilledRecordsCounter,
+                     null, spilledRecordsCounter, null,
                      null); // Not using any Progress in TezMerger. Should just work.
 
       //write merged output to disk
       long segmentStart = finalOut.getPos();
       Writer writer =
           new Writer(conf, finalOut, keyClass, valClass, codec,
-                           spilledRecordsCounter);
+                           spilledRecordsCounter, null);
       writer.setRLE(merger.needsRLE());
       if (combiner == null || numSpills < minSpillsForCombine) {
         TezMerger.writeFile(kvIter, writer, nullProgressable, TezJobConfig.DEFAULT_RECORDS_BEFORE_PROGRESS);