You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/25 09:31:37 UTC

[30/50] [abbrv] Rename tez-engine-api to tez-runtime-api and tez-engine is split into 2: - tez-engine-library for user-visible Input/Output/Processor implementations - tez-engine-internals for framework internals

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
new file mode 100644
index 0000000..bb4b4a2
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
@@ -0,0 +1,798 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.common.sort.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ChecksumFileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.util.PriorityQueue;
+import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.Progressable;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.runtime.library.common.Constants;
+import org.apache.tez.runtime.library.common.sort.impl.IFile.Reader;
+import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
+
+/**
+ * Merger is an utility class used by the Map and Reduce tasks for merging
+ * both their memory and disk segments
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+@SuppressWarnings({"unchecked", "rawtypes"})
+public class TezMerger {  
+  private static final Log LOG = LogFactory.getLog(TezMerger.class);
+
+  
+  // Local directories
+  private static LocalDirAllocator lDirAlloc = 
+    new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
+
+  public static
+  TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
+                            Class keyClass, Class valueClass, 
+                            CompressionCodec codec,
+                            Path[] inputs, boolean deleteInputs, 
+                            int mergeFactor, Path tmpDir,
+                            RawComparator comparator, Progressable reporter,
+                            TezCounter readsCounter,
+                            TezCounter writesCounter,
+                            Progress mergePhase)
+  throws IOException {
+    return 
+      new MergeQueue(conf, fs, inputs, deleteInputs, codec, comparator, 
+                           reporter, null).merge(keyClass, valueClass,
+                                           mergeFactor, tmpDir,
+                                           readsCounter, writesCounter, 
+                                           mergePhase);
+  }
+
+  public static 
+  TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
+                            Class keyClass, Class valueClass, 
+                            CompressionCodec codec,
+                            Path[] inputs, boolean deleteInputs, 
+                            int mergeFactor, Path tmpDir,
+                            RawComparator comparator,
+                            Progressable reporter,
+                            TezCounter readsCounter,
+                            TezCounter writesCounter,
+                            TezCounter mergedMapOutputsCounter,
+                            Progress mergePhase)
+  throws IOException {
+    return 
+      new MergeQueue(conf, fs, inputs, deleteInputs, codec, comparator, 
+                           reporter, mergedMapOutputsCounter).merge(
+                                           keyClass, valueClass,
+                                           mergeFactor, tmpDir,
+                                           readsCounter, writesCounter,
+                                           mergePhase);
+  }
+  
+  public static
+  TezRawKeyValueIterator merge(Configuration conf, FileSystem fs, 
+                            Class keyClass, Class valueClass, 
+                            List<Segment> segments, 
+                            int mergeFactor, Path tmpDir,
+                            RawComparator comparator, Progressable reporter,
+                            TezCounter readsCounter,
+                            TezCounter writesCounter,
+                            Progress mergePhase)
+      throws IOException {
+    return merge(conf, fs, keyClass, valueClass, segments, mergeFactor, tmpDir,
+                 comparator, reporter, false, readsCounter, writesCounter,
+                 mergePhase);
+  }
+
+  public static <K extends Object, V extends Object>
+  TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
+                            Class keyClass, Class valueClass,
+                            List<Segment> segments,
+                            int mergeFactor, Path tmpDir,
+                            RawComparator comparator, Progressable reporter,
+                            boolean sortSegments,
+                            TezCounter readsCounter,
+                            TezCounter writesCounter,
+                            Progress mergePhase)
+      throws IOException {
+    return new MergeQueue(conf, fs, segments, comparator, reporter,
+                           sortSegments).merge(keyClass, valueClass,
+                                               mergeFactor, tmpDir,
+                                               readsCounter, writesCounter,
+                                               mergePhase);
+  }
+
+  public static <K extends Object, V extends Object>
+  TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
+                            Class keyClass, Class valueClass,
+                            CompressionCodec codec,
+                            List<Segment> segments,
+                            int mergeFactor, Path tmpDir,
+                            RawComparator comparator, Progressable reporter,
+                            boolean sortSegments,
+                            TezCounter readsCounter,
+                            TezCounter writesCounter,
+                            Progress mergePhase)
+      throws IOException {
+    return new MergeQueue(conf, fs, segments, comparator, reporter,
+                           sortSegments, codec).merge(keyClass, valueClass,
+                                               mergeFactor, tmpDir,
+                                               readsCounter, writesCounter,
+                                               mergePhase);
+  }
+
+  public static <K extends Object, V extends Object>
+    TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
+                            Class keyClass, Class valueClass,
+                            List<Segment> segments,
+                            int mergeFactor, int inMemSegments, Path tmpDir,
+                            RawComparator comparator, Progressable reporter,
+                            boolean sortSegments,
+                            TezCounter readsCounter,
+                            TezCounter writesCounter,
+                            Progress mergePhase)
+      throws IOException {
+    return new MergeQueue(conf, fs, segments, comparator, reporter,
+                           sortSegments).merge(keyClass, valueClass,
+                                               mergeFactor, inMemSegments,
+                                               tmpDir,
+                                               readsCounter, writesCounter,
+                                               mergePhase);
+  }
+
+
+  static <K extends Object, V extends Object>
+  TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
+                          Class keyClass, Class valueClass,
+                          CompressionCodec codec,
+                          List<Segment> segments,
+                          int mergeFactor, int inMemSegments, Path tmpDir,
+                          RawComparator comparator, Progressable reporter,
+                          boolean sortSegments,
+                          TezCounter readsCounter,
+                          TezCounter writesCounter,
+                          Progress mergePhase)
+    throws IOException {
+  return new MergeQueue(conf, fs, segments, comparator, reporter,
+                         sortSegments, codec).merge(keyClass, valueClass,
+                                             mergeFactor, inMemSegments,
+                                             tmpDir,
+                                             readsCounter, writesCounter,
+                                             mergePhase);
+}
+
+  public static <K extends Object, V extends Object>
+  void writeFile(TezRawKeyValueIterator records, Writer writer, 
+                 Progressable progressable, Configuration conf) 
+  throws IOException {
+    long progressBar = 
+        conf.getLong(TezJobConfig.RECORDS_BEFORE_PROGRESS, 
+            TezJobConfig.DEFAULT_RECORDS_BEFORE_PROGRESS);
+    long recordCtr = 0;
+    while(records.next()) {
+      writer.append(records.getKey(), records.getValue());
+      
+      if (((recordCtr++) % progressBar) == 0) {
+        progressable.progress();
+      }
+    }
+}
+
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  public static class Segment<K extends Object, V extends Object> {
+    Reader reader = null;
+    final DataInputBuffer key = new DataInputBuffer();
+    
+    Configuration conf = null;
+    FileSystem fs = null;
+    Path file = null;
+    boolean preserve = false;
+    CompressionCodec codec = null;
+    long segmentOffset = 0;
+    long segmentLength = -1;
+    
+    TezCounter mapOutputsCounter = null;
+
+    public Segment(Configuration conf, FileSystem fs, Path file,
+                   CompressionCodec codec, boolean preserve)
+    throws IOException {
+      this(conf, fs, file, codec, preserve, null);
+    }
+
+    public Segment(Configuration conf, FileSystem fs, Path file,
+                   CompressionCodec codec, boolean preserve,
+                   TezCounter mergedMapOutputsCounter)
+  throws IOException {
+      this(conf, fs, file, 0, fs.getFileStatus(file).getLen(), codec, preserve, 
+           mergedMapOutputsCounter);
+    }
+
+    public Segment(Configuration conf, FileSystem fs, Path file,
+                   long segmentOffset, long segmentLength,
+                   CompressionCodec codec,
+                   boolean preserve) throws IOException {
+      this(conf, fs, file, segmentOffset, segmentLength, codec, preserve, null);
+    }
+
+    public Segment(Configuration conf, FileSystem fs, Path file,
+        long segmentOffset, long segmentLength, CompressionCodec codec,
+        boolean preserve, TezCounter mergedMapOutputsCounter)
+    throws IOException {
+      this.conf = conf;
+      this.fs = fs;
+      this.file = file;
+      this.codec = codec;
+      this.preserve = preserve;
+
+      this.segmentOffset = segmentOffset;
+      this.segmentLength = segmentLength;
+      
+      this.mapOutputsCounter = mergedMapOutputsCounter;
+    }
+    
+    public Segment(Reader reader, boolean preserve) {
+      this(reader, preserve, null);
+    }
+    
+    public Segment(Reader reader, boolean preserve, 
+                   TezCounter mapOutputsCounter) {
+      this.reader = reader;
+      this.preserve = preserve;
+      
+      this.segmentLength = reader.getLength();
+      
+      this.mapOutputsCounter = mapOutputsCounter;
+    }
+
+    void init(TezCounter readsCounter) throws IOException {
+      if (reader == null) {
+        FSDataInputStream in = fs.open(file);
+        in.seek(segmentOffset);
+        reader = new Reader(conf, in, segmentLength, codec, readsCounter);
+      }
+      
+      if (mapOutputsCounter != null) {
+        mapOutputsCounter.increment(1);
+      }
+    }
+    
+    boolean inMemory() {
+      return fs == null;
+    }
+    
+    DataInputBuffer getKey() { return key; }
+
+    DataInputBuffer getValue(DataInputBuffer value) throws IOException {
+      nextRawValue(value);
+      return value;
+    }
+
+    public long getLength() { 
+      return (reader == null) ?
+        segmentLength : reader.getLength();
+    }
+    
+    boolean nextRawKey() throws IOException {
+      return reader.nextRawKey(key);
+    }
+
+    void nextRawValue(DataInputBuffer value) throws IOException {
+      reader.nextRawValue(value);
+    }
+
+    void closeReader() throws IOException {
+      if (reader != null) {
+        reader.close();
+        reader = null;
+      }
+    }
+    
+    void close() throws IOException {
+      closeReader();
+      if (!preserve && fs != null) {
+        fs.delete(file, false);
+      }
+    }
+
+    public long getPosition() throws IOException {
+      return reader.getPosition();
+    }
+
+    // This method is used by BackupStore to extract the 
+    // absolute position after a reset
+    long getActualPosition() throws IOException {
+      return segmentOffset + reader.getPosition();
+    }
+
+    Reader getReader() {
+      return reader;
+    }
+    
+    // This method is used by BackupStore to reinitialize the
+    // reader to start reading from a different segment offset
+    void reinitReader(int offset) throws IOException {
+      if (!inMemory()) {
+        closeReader();
+        segmentOffset = offset;
+        segmentLength = fs.getFileStatus(file).getLen() - segmentOffset;
+        init(null);
+      }
+    }
+  }
+  
+  // Boolean variable for including/considering final merge as part of sort
+  // phase or not. This is true in map task, false in reduce task. It is
+  // used in calculating mergeProgress.
+  static boolean includeFinalMerge = false;
+  
+  /**
+   * Sets the boolean variable includeFinalMerge to true. Called from
+   * map task before calling merge() so that final merge of map task
+   * is also considered as part of sort phase.
+   */
+  public static void considerFinalMergeForProgress() {
+    includeFinalMerge = true;
+  }
+  
+  private static class MergeQueue<K extends Object, V extends Object> 
+  extends PriorityQueue<Segment> implements TezRawKeyValueIterator {
+    Configuration conf;
+    FileSystem fs;
+    CompressionCodec codec;
+    
+    List<Segment> segments = new ArrayList<Segment>();
+    
+    RawComparator comparator;
+    
+    private long totalBytesProcessed;
+    private float progPerByte;
+    private Progress mergeProgress = new Progress();
+    
+    Progressable reporter;
+    
+    DataInputBuffer key;
+    final DataInputBuffer value = new DataInputBuffer();
+    final DataInputBuffer diskIFileValue = new DataInputBuffer();
+    
+    Segment minSegment;
+    Comparator<Segment> segmentComparator =   
+      new Comparator<Segment>() {
+      public int compare(Segment o1, Segment o2) {
+        if (o1.getLength() == o2.getLength()) {
+          return 0;
+        }
+
+        return o1.getLength() < o2.getLength() ? -1 : 1;
+      }
+    };
+
+    public MergeQueue(Configuration conf, FileSystem fs, 
+                      Path[] inputs, boolean deleteInputs, 
+                      CompressionCodec codec, RawComparator comparator,
+                      Progressable reporter, 
+                      TezCounter mergedMapOutputsCounter) 
+    throws IOException {
+      this.conf = conf;
+      this.fs = fs;
+      this.codec = codec;
+      this.comparator = comparator;
+      this.reporter = reporter;
+      
+      for (Path file : inputs) {
+        LOG.debug("MergeQ: adding: " + file);
+        segments.add(new Segment(conf, fs, file, codec, !deleteInputs, 
+                                       (file.toString().endsWith(
+                                           Constants.MERGED_OUTPUT_PREFIX) ? 
+                                        null : mergedMapOutputsCounter)));
+      }
+      
+      // Sort segments on file-lengths
+      Collections.sort(segments, segmentComparator); 
+    }
+    
+    public MergeQueue(Configuration conf, FileSystem fs, 
+        List<Segment> segments, RawComparator comparator,
+        Progressable reporter, boolean sortSegments) {
+      this.conf = conf;
+      this.fs = fs;
+      this.comparator = comparator;
+      this.segments = segments;
+      this.reporter = reporter;
+      if (sortSegments) {
+        Collections.sort(segments, segmentComparator);
+      }
+    }
+
+    public MergeQueue(Configuration conf, FileSystem fs,
+        List<Segment> segments, RawComparator comparator,
+        Progressable reporter, boolean sortSegments, CompressionCodec codec) {
+      this(conf, fs, segments, comparator, reporter, sortSegments);
+      this.codec = codec;
+    }
+
+    public void close() throws IOException {
+      Segment segment;
+      while((segment = pop()) != null) {
+        segment.close();
+      }
+    }
+
+    public DataInputBuffer getKey() throws IOException {
+      return key;
+    }
+
+    public DataInputBuffer getValue() throws IOException {
+      return value;
+    }
+
+    private void adjustPriorityQueue(Segment reader) throws IOException{
+      long startPos = reader.getPosition();
+      boolean hasNext = reader.nextRawKey();
+      long endPos = reader.getPosition();
+      totalBytesProcessed += endPos - startPos;
+      mergeProgress.set(totalBytesProcessed * progPerByte);
+      if (hasNext) {
+        adjustTop();
+      } else {
+        pop();
+        reader.close();
+      }
+    }
+
+    public boolean next() throws IOException {
+      if (size() == 0)
+        return false;
+
+      if (minSegment != null) {
+        //minSegment is non-null for all invocations of next except the first
+        //one. For the first invocation, the priority queue is ready for use
+        //but for the subsequent invocations, first adjust the queue 
+        adjustPriorityQueue(minSegment);
+        if (size() == 0) {
+          minSegment = null;
+          return false;
+        }
+      }
+      minSegment = top();
+      if (!minSegment.inMemory()) {
+        //When we load the value from an inmemory segment, we reset
+        //the "value" DIB in this class to the inmem segment's byte[].
+        //When we load the value bytes from disk, we shouldn't use
+        //the same byte[] since it would corrupt the data in the inmem
+        //segment. So we maintain an explicit DIB for value bytes
+        //obtained from disk, and if the current segment is a disk
+        //segment, we reset the "value" DIB to the byte[] in that (so 
+        //we reuse the disk segment DIB whenever we consider
+        //a disk segment).
+        value.reset(diskIFileValue.getData(), diskIFileValue.getLength());
+      }
+      long startPos = minSegment.getPosition();
+      key = minSegment.getKey();
+      minSegment.getValue(value);
+      long endPos = minSegment.getPosition();
+      totalBytesProcessed += endPos - startPos;
+      mergeProgress.set(totalBytesProcessed * progPerByte);
+      return true;
+    }
+
+    protected boolean lessThan(Object a, Object b) {
+      DataInputBuffer key1 = ((Segment)a).getKey();
+      DataInputBuffer key2 = ((Segment)b).getKey();
+      int s1 = key1.getPosition();
+      int l1 = key1.getLength() - s1;
+      int s2 = key2.getPosition();
+      int l2 = key2.getLength() - s2;
+
+      return comparator.compare(key1.getData(), s1, l1, key2.getData(), s2, l2) < 0;
+    }
+    
+    public TezRawKeyValueIterator merge(Class keyClass, Class valueClass,
+                                     int factor, Path tmpDir,
+                                     TezCounter readsCounter,
+                                     TezCounter writesCounter,
+                                     Progress mergePhase)
+        throws IOException {
+      return merge(keyClass, valueClass, factor, 0, tmpDir,
+                   readsCounter, writesCounter, mergePhase);
+    }
+
+    TezRawKeyValueIterator merge(Class keyClass, Class valueClass,
+                                     int factor, int inMem, Path tmpDir,
+                                     TezCounter readsCounter,
+                                     TezCounter writesCounter,
+                                     Progress mergePhase)
+        throws IOException {
+      LOG.info("Merging " + segments.size() + " sorted segments");
+
+      /*
+       * If there are inMemory segments, then they come first in the segments
+       * list and then the sorted disk segments. Otherwise(if there are only
+       * disk segments), then they are sorted segments if there are more than
+       * factor segments in the segments list.
+       */
+      int numSegments = segments.size();
+      int origFactor = factor;
+      int passNo = 1;
+      if (mergePhase != null) {
+        mergeProgress = mergePhase;
+      }
+
+      long totalBytes = computeBytesInMerges(factor, inMem);
+      if (totalBytes != 0) {
+        progPerByte = 1.0f / (float)totalBytes;
+      }
+      
+      //create the MergeStreams from the sorted map created in the constructor
+      //and dump the final output to a file
+      do {
+        //get the factor for this pass of merge. We assume in-memory segments
+        //are the first entries in the segment list and that the pass factor
+        //doesn't apply to them
+        factor = getPassFactor(factor, passNo, numSegments - inMem);
+        if (1 == passNo) {
+          factor += inMem;
+        }
+        List<Segment> segmentsToMerge =
+          new ArrayList<Segment>();
+        int segmentsConsidered = 0;
+        int numSegmentsToConsider = factor;
+        long startBytes = 0; // starting bytes of segments of this merge
+        while (true) {
+          //extract the smallest 'factor' number of segments  
+          //Call cleanup on the empty segments (no key/value data)
+          List<Segment> mStream = 
+            getSegmentDescriptors(numSegmentsToConsider);
+          for (Segment segment : mStream) {
+            // Initialize the segment at the last possible moment;
+            // this helps in ensuring we don't use buffers until we need them
+            segment.init(readsCounter);
+            long startPos = segment.getPosition();
+            boolean hasNext = segment.nextRawKey();
+            long endPos = segment.getPosition();
+            
+            if (hasNext) {
+              startBytes += endPos - startPos;
+              segmentsToMerge.add(segment);
+              segmentsConsidered++;
+            }
+            else {
+              segment.close();
+              numSegments--; //we ignore this segment for the merge
+            }
+          }
+          //if we have the desired number of segments
+          //or looked at all available segments, we break
+          if (segmentsConsidered == factor || 
+              segments.size() == 0) {
+            break;
+          }
+            
+          numSegmentsToConsider = factor - segmentsConsidered;
+        }
+        
+        //feed the streams to the priority queue
+        initialize(segmentsToMerge.size());
+        clear();
+        for (Segment segment : segmentsToMerge) {
+          put(segment);
+        }
+        
+        //if we have lesser number of segments remaining, then just return the
+        //iterator, else do another single level merge
+        if (numSegments <= factor) {
+          if (!includeFinalMerge) { // for reduce task
+
+            // Reset totalBytesProcessed and recalculate totalBytes from the
+            // remaining segments to track the progress of the final merge.
+            // Final merge is considered as the progress of the reducePhase,
+            // the 3rd phase of reduce task.
+            totalBytesProcessed = 0;
+            totalBytes = 0;
+            for (int i = 0; i < segmentsToMerge.size(); i++) {
+              totalBytes += segmentsToMerge.get(i).getLength();
+            }
+          }
+          if (totalBytes != 0) //being paranoid
+            progPerByte = 1.0f / (float)totalBytes;
+          
+          totalBytesProcessed += startBytes;         
+          if (totalBytes != 0)
+            mergeProgress.set(totalBytesProcessed * progPerByte);
+          else
+            mergeProgress.set(1.0f); // Last pass and no segments left - we're done
+          
+          LOG.info("Down to the last merge-pass, with " + numSegments + 
+                   " segments left of total size: " +
+                   (totalBytes - totalBytesProcessed) + " bytes");
+          return this;
+        } else {
+          LOG.info("Merging " + segmentsToMerge.size() + 
+                   " intermediate segments out of a total of " + 
+                   (segments.size()+segmentsToMerge.size()));
+          
+          long bytesProcessedInPrevMerges = totalBytesProcessed;
+          totalBytesProcessed += startBytes;
+
+          //we want to spread the creation of temp files on multiple disks if 
+          //available under the space constraints
+          long approxOutputSize = 0; 
+          for (Segment s : segmentsToMerge) {
+            approxOutputSize += s.getLength() + 
+                                ChecksumFileSystem.getApproxChkSumLength(
+                                s.getLength());
+          }
+          Path tmpFilename = 
+            new Path(tmpDir, "intermediate").suffix("." + passNo);
+
+          Path outputFile =  lDirAlloc.getLocalPathForWrite(
+                                              tmpFilename.toString(),
+                                              approxOutputSize, conf);
+
+          Writer writer = 
+            new Writer(conf, fs, outputFile, keyClass, valueClass, codec,
+                             writesCounter);
+          writeFile(this, writer, reporter, conf);
+          writer.close();
+          
+          //we finished one single level merge; now clean up the priority 
+          //queue
+          this.close();
+
+          // Add the newly create segment to the list of segments to be merged
+          Segment tempSegment = 
+            new Segment(conf, fs, outputFile, codec, false);
+
+          // Insert new merged segment into the sorted list
+          int pos = Collections.binarySearch(segments, tempSegment,
+                                             segmentComparator);
+          if (pos < 0) {
+            // binary search failed. So position to be inserted at is -pos-1
+            pos = -pos-1;
+          }
+          segments.add(pos, tempSegment);
+          numSegments = segments.size();
+          
+          // Subtract the difference between expected size of new segment and 
+          // actual size of new segment(Expected size of new segment is
+          // inputBytesOfThisMerge) from totalBytes. Expected size and actual
+          // size will match(almost) if combiner is not called in merge.
+          long inputBytesOfThisMerge = totalBytesProcessed -
+                                       bytesProcessedInPrevMerges;
+          totalBytes -= inputBytesOfThisMerge - tempSegment.getLength();
+          if (totalBytes != 0) {
+            progPerByte = 1.0f / (float)totalBytes;
+          }
+          
+          passNo++;
+        }
+        //we are worried about only the first pass merge factor. So reset the 
+        //factor to what it originally was
+        factor = origFactor;
+      } while(true);
+    }
+    
+    /**
+     * Determine the number of segments to merge in a given pass. Assuming more
+     * than factor segments, the first pass should attempt to bring the total
+     * number of segments - 1 to be divisible by the factor - 1 (each pass
+     * takes X segments and produces 1) to minimize the number of merges.
+     */
+    private int getPassFactor(int factor, int passNo, int numSegments) {
+      if (passNo > 1 || numSegments <= factor || factor == 1) 
+        return factor;
+      int mod = (numSegments - 1) % (factor - 1);
+      if (mod == 0)
+        return factor;
+      return mod + 1;
+    }
+    
+    /** Return (& remove) the requested number of segment descriptors from the
+     * sorted map.
+     */
+    private List<Segment> getSegmentDescriptors(int numDescriptors) {
+      if (numDescriptors > segments.size()) {
+        List<Segment> subList = new ArrayList<Segment>(segments);
+        segments.clear();
+        return subList;
+      }
+      
+      List<Segment> subList = 
+        new ArrayList<Segment>(segments.subList(0, numDescriptors));
+      for (int i=0; i < numDescriptors; ++i) {
+        segments.remove(0);
+      }
+      return subList;
+    }
+    
+    /**
+     * Compute expected size of input bytes to merges, will be used in
+     * calculating mergeProgress. This simulates the above merge() method and
+     * tries to obtain the number of bytes that are going to be merged in all
+     * merges(assuming that there is no combiner called while merging).
+     * @param factor mapreduce.task.io.sort.factor
+     * @param inMem  number of segments in memory to be merged
+     */
+    long computeBytesInMerges(int factor, int inMem) {
+      int numSegments = segments.size();
+      List<Long> segmentSizes = new ArrayList<Long>(numSegments);
+      long totalBytes = 0;
+      int n = numSegments - inMem;
+      // factor for 1st pass
+      int f = getPassFactor(factor, 1, n) + inMem;
+      n = numSegments;
+ 
+      for (int i = 0; i < numSegments; i++) {
+        // Not handling empty segments here assuming that it would not affect
+        // much in calculation of mergeProgress.
+        segmentSizes.add(segments.get(i).getLength());
+      }
+      
+      // If includeFinalMerge is true, allow the following while loop iterate
+      // for 1 more iteration. This is to include final merge as part of the
+      // computation of expected input bytes of merges
+      boolean considerFinalMerge = includeFinalMerge;
+      
+      while (n > f || considerFinalMerge) {
+        if (n <=f ) {
+          considerFinalMerge = false;
+        }
+        long mergedSize = 0;
+        f = Math.min(f, segmentSizes.size());
+        for (int j = 0; j < f; j++) {
+          mergedSize += segmentSizes.remove(0);
+        }
+        totalBytes += mergedSize;
+        
+        // insert new size into the sorted list
+        int pos = Collections.binarySearch(segmentSizes, mergedSize);
+        if (pos < 0) {
+          pos = -pos-1;
+        }
+        segmentSizes.add(pos, mergedSize);
+        
+        n -= (f-1);
+        f = factor;
+      }
+
+      return totalBytes;
+    }
+
+    public Progress getProgress() {
+      return mergeProgress;
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezRawKeyValueIterator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezRawKeyValueIterator.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezRawKeyValueIterator.java
new file mode 100644
index 0000000..3a2c2bf
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezRawKeyValueIterator.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.common.sort.impl;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.util.Progress;
+
+/**
+ * <code>TezRawKeyValueIterator</code> is an iterator used to iterate over
+ * the raw keys and values during sort/merge of intermediate data. 
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface TezRawKeyValueIterator {
+  /** 
+   * Gets the current raw key.
+   * 
+   * @return Gets the current raw key as a DataInputBuffer
+   * @throws IOException
+   */
+  DataInputBuffer getKey() throws IOException;
+  
+  /** 
+   * Gets the current raw value.
+   * 
+   * @return Gets the current raw value as a DataInputBuffer 
+   * @throws IOException
+   */
+  DataInputBuffer getValue() throws IOException;
+  
+  /** 
+   * Sets up the current key and value (for getKey and getValue).
+   * 
+   * @return <code>true</code> if there exists a key/value, 
+   *         <code>false</code> otherwise. 
+   * @throws IOException
+   */
+  boolean next() throws IOException;
+  
+  /** 
+   * Closes the iterator so that the underlying streams can be closed.
+   * 
+   * @throws IOException
+   */
+  void close() throws IOException;
+  
+  /** Gets the Progress object; this has a float (0.0 - 1.0) 
+   * indicating the bytes processed by the iterator so far
+   */
+  Progress getProgress();
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezSpillRecord.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezSpillRecord.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezSpillRecord.java
new file mode 100644
index 0000000..ab4142b
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezSpillRecord.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.common.sort.impl;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.LongBuffer;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.CheckedOutputStream;
+import java.util.zip.Checksum;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.PureJavaCrc32;
+import org.apache.tez.runtime.library.common.Constants;
+
+public class TezSpillRecord {
+
+  /** Backing store */
+  private final ByteBuffer buf;
+  /** View of backing storage as longs */
+  private final LongBuffer entries;
+
+  public TezSpillRecord(int numPartitions) {
+    buf = ByteBuffer.allocate(
+        numPartitions * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH);
+    entries = buf.asLongBuffer();
+  }
+
+  public TezSpillRecord(Path indexFileName, Configuration job) throws IOException {
+    this(indexFileName, job, null);
+  }
+
+  public TezSpillRecord(Path indexFileName, Configuration job, String expectedIndexOwner)
+    throws IOException {
+    this(indexFileName, job, new PureJavaCrc32(), expectedIndexOwner);
+  }
+
+  public TezSpillRecord(Path indexFileName, Configuration job, Checksum crc,
+                     String expectedIndexOwner)
+      throws IOException {
+
+    final FileSystem rfs = FileSystem.getLocal(job).getRaw();
+    final FSDataInputStream in = rfs.open(indexFileName);
+    try {
+      final long length = rfs.getFileStatus(indexFileName).getLen();
+      final int partitions = 
+          (int) length / Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH;
+      final int size = partitions * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH;
+
+      buf = ByteBuffer.allocate(size);
+      if (crc != null) {
+        crc.reset();
+        CheckedInputStream chk = new CheckedInputStream(in, crc);
+        IOUtils.readFully(chk, buf.array(), 0, size);
+        if (chk.getChecksum().getValue() != in.readLong()) {
+          throw new ChecksumException("Checksum error reading spill index: " +
+                                indexFileName, -1);
+        }
+      } else {
+        IOUtils.readFully(in, buf.array(), 0, size);
+      }
+      entries = buf.asLongBuffer();
+    } finally {
+      in.close();
+    }
+  }
+
+  /**
+   * Return number of IndexRecord entries in this spill.
+   */
+  public int size() {
+    return entries.capacity() / (Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH / 8);
+  }
+
+  /**
+   * Get spill offsets for given partition.
+   */
+  public TezIndexRecord getIndex(int partition) {
+    final int pos = partition * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH / 8;
+    return new TezIndexRecord(entries.get(pos), entries.get(pos + 1),
+                           entries.get(pos + 2));
+  }
+
+  /**
+   * Set spill offsets for given partition.
+   */
+  public void putIndex(TezIndexRecord rec, int partition) {
+    final int pos = partition * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH / 8;
+    entries.put(pos, rec.getStartOffset());
+    entries.put(pos + 1, rec.getRawLength());
+    entries.put(pos + 2, rec.getPartLength());
+  }
+
+  /**
+   * Write this spill record to the location provided.
+   */
+  public void writeToFile(Path loc, Configuration job)
+      throws IOException {
+    writeToFile(loc, job, new PureJavaCrc32());
+  }
+
+  public void writeToFile(Path loc, Configuration job, Checksum crc)
+      throws IOException {
+    final FileSystem rfs = FileSystem.getLocal(job).getRaw();
+    CheckedOutputStream chk = null;
+    final FSDataOutputStream out = rfs.create(loc);
+    try {
+      if (crc != null) {
+        crc.reset();
+        chk = new CheckedOutputStream(out, crc);
+        chk.write(buf.array());
+        out.writeLong(chk.getChecksum().getValue());
+      } else {
+        out.write(buf.array());
+      }
+    } finally {
+      if (chk != null) {
+        chk.close();
+      } else {
+        out.close();
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
new file mode 100644
index 0000000..1ff486f
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -0,0 +1,1108 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.runtime.library.common.sort.impl.dflt;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.IntBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.util.IndexedSortable;
+import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.runtime.api.TezOutputContext;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.sort.impl.ExternalSorter;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
+import org.apache.tez.runtime.library.common.sort.impl.TezMerger;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
+import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
+import org.apache.tez.runtime.library.common.sort.impl.TezMerger.Segment;
+
+@SuppressWarnings({"unchecked", "rawtypes"})
+public class DefaultSorter extends ExternalSorter implements IndexedSortable {
+  
+  private static final Log LOG = LogFactory.getLog(DefaultSorter.class);
+
+  // TODO NEWTEZ Progress reporting to Tez framework. (making progress vs %complete)
+  
+  /**
+   * The size of each record in the index file for the map-outputs.
+   */
+  public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
+
+  private final static int APPROX_HEADER_LENGTH = 150;
+
+  // k/v accounting
+  IntBuffer kvmeta; // metadata overlay on backing store
+  int kvstart;            // marks origin of spill metadata
+  int kvend;              // marks end of spill metadata
+  int kvindex;            // marks end of fully serialized records
+
+  int equator;            // marks origin of meta/serialization
+  int bufstart;           // marks beginning of spill
+  int bufend;             // marks beginning of collectable
+  int bufmark;            // marks end of record
+  int bufindex;           // marks end of collected
+  int bufvoid;            // marks the point where we should stop
+                          // reading at the end of the buffer
+
+  byte[] kvbuffer;        // main output buffer
+  private final byte[] b0 = new byte[0];
+
+  protected static final int INDEX = 0;            // index offset in acct
+  protected static final int VALSTART = 1;         // val offset in acct
+  protected static final int KEYSTART = 2;         // key offset in acct
+  protected static final int PARTITION = 3;        // partition offset in acct
+  protected static final int NMETA = 4;            // num meta ints
+  protected static final int METASIZE = NMETA * 4; // size in bytes
+
+  // spill accounting
+  int maxRec;
+  int softLimit;
+  boolean spillInProgress;
+  int bufferRemaining;
+  volatile Throwable sortSpillException = null;
+
+  int numSpills = 0;
+  int minSpillsForCombine;
+  final ReentrantLock spillLock = new ReentrantLock();
+  final Condition spillDone = spillLock.newCondition();
+  final Condition spillReady = spillLock.newCondition();
+  final BlockingBuffer bb = new BlockingBuffer();
+  volatile boolean spillThreadRunning = false;
+  final SpillThread spillThread = new SpillThread();
+
+  final ArrayList<TezSpillRecord> indexCacheList =
+    new ArrayList<TezSpillRecord>();
+  private int totalIndexCacheMemory;
+  private int indexCacheMemoryLimit;
+
+  @Override
+  public void initialize(TezOutputContext outputContext, Configuration conf, int numOutputs) throws IOException { 
+    super.initialize(outputContext, conf, numOutputs);
+
+    // sanity checks
+    final float spillper = this.conf.getFloat(
+        TezJobConfig.TEZ_RUNTIME_SORT_SPILL_PERCENT,
+        TezJobConfig.DEFAULT_TEZ_RUNTIME_SORT_SPILL_PERCENT);
+    final int sortmb = this.conf.getInt(TezJobConfig.TEZ_RUNTIME_IO_SORT_MB,
+        TezJobConfig.DEFAULT_TEZ_RUNTIME_IO_SORT_MB);
+    if (spillper > (float) 1.0 || spillper <= (float) 0.0) {
+      throw new IOException("Invalid \""
+          + TezJobConfig.TEZ_RUNTIME_SORT_SPILL_PERCENT + "\": " + spillper);
+    }
+    if ((sortmb & 0x7FF) != sortmb) {
+      throw new IOException("Invalid \"" + TezJobConfig.TEZ_RUNTIME_IO_SORT_MB
+          + "\": " + sortmb);
+    }
+
+    indexCacheMemoryLimit = this.conf.getInt(TezJobConfig.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES,
+                                       TezJobConfig.DEFAULT_TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES);
+
+    // buffers and accounting
+    int maxMemUsage = sortmb << 20;
+    maxMemUsage -= maxMemUsage % METASIZE;
+    kvbuffer = new byte[maxMemUsage];
+    bufvoid = kvbuffer.length;
+    kvmeta = ByteBuffer.wrap(kvbuffer)
+       .order(ByteOrder.nativeOrder())
+       .asIntBuffer();
+    setEquator(0);
+    bufstart = bufend = bufindex = equator;
+    kvstart = kvend = kvindex;
+
+    maxRec = kvmeta.capacity() / NMETA;
+    softLimit = (int)(kvbuffer.length * spillper);
+    bufferRemaining = softLimit;
+    if (LOG.isInfoEnabled()) {
+      LOG.info(TezJobConfig.TEZ_RUNTIME_IO_SORT_MB + ": " + sortmb);
+      LOG.info("soft limit at " + softLimit);
+      LOG.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid);
+      LOG.info("kvstart = " + kvstart + "; length = " + maxRec);
+    }
+
+    // k/v serialization
+    valSerializer.open(bb);
+    keySerializer.open(bb);
+
+    spillInProgress = false;
+    minSpillsForCombine = this.conf.getInt(TezJobConfig.TEZ_RUNTIME_COMBINE_MIN_SPILLS, 3);
+    spillThread.setDaemon(true);
+    spillThread.setName("SpillThread");
+    spillLock.lock();
+    try {
+      spillThread.start();
+      while (!spillThreadRunning) {
+        spillDone.await();
+      }
+    } catch (InterruptedException e) {
+      throw new IOException("Spill thread failed to initialize", e);
+    } finally {
+      spillLock.unlock();
+    }
+    if (sortSpillException != null) {
+      throw new IOException("Spill thread failed to initialize",
+          sortSpillException);
+    }
+  }
+
+  @Override
+  public void write(Object key, Object value)
+      throws IOException {
+    collect(
+        key, value, partitioner.getPartition(key, value, partitions));
+  }
+
+  /**
+   * Serialize the key, value to intermediate storage.
+   * When this method returns, kvindex must refer to sufficient unused
+   * storage to store one METADATA.
+   */
+  synchronized void collect(Object key, Object value, final int partition
+                                   ) throws IOException {
+
+    if (key.getClass() != keyClass) {
+      throw new IOException("Type mismatch in key from map: expected "
+                            + keyClass.getName() + ", received "
+                            + key.getClass().getName());
+    }
+    if (value.getClass() != valClass) {
+      throw new IOException("Type mismatch in value from map: expected "
+                            + valClass.getName() + ", received "
+                            + value.getClass().getName());
+    }
+    if (partition < 0 || partition >= partitions) {
+      throw new IOException("Illegal partition for " + key + " (" +
+          partition + ")" + ", TotalPartitions: " + partitions);
+    }
+    checkSpillException();
+    bufferRemaining -= METASIZE;
+    if (bufferRemaining <= 0) {
+      // start spill if the thread is not running and the soft limit has been
+      // reached
+      spillLock.lock();
+      try {
+        do {
+          if (!spillInProgress) {
+            final int kvbidx = 4 * kvindex;
+            final int kvbend = 4 * kvend;
+            // serialized, unspilled bytes always lie between kvindex and
+            // bufindex, crossing the equator. Note that any void space
+            // created by a reset must be included in "used" bytes
+            final int bUsed = distanceTo(kvbidx, bufindex);
+            final boolean bufsoftlimit = bUsed >= softLimit;
+            if ((kvbend + METASIZE) % kvbuffer.length !=
+                equator - (equator % METASIZE)) {
+              // spill finished, reclaim space
+              resetSpill();
+              bufferRemaining = Math.min(
+                  distanceTo(bufindex, kvbidx) - 2 * METASIZE,
+                  softLimit - bUsed) - METASIZE;
+              continue;
+            } else if (bufsoftlimit && kvindex != kvend) {
+              // spill records, if any collected; check latter, as it may
+              // be possible for metadata alignment to hit spill pcnt
+              startSpill();
+              final int avgRec = (int)
+                (mapOutputByteCounter.getValue() /
+                mapOutputRecordCounter.getValue());
+              // leave at least half the split buffer for serialization data
+              // ensure that kvindex >= bufindex
+              final int distkvi = distanceTo(bufindex, kvbidx);
+              final int newPos = (bufindex +
+                Math.max(2 * METASIZE - 1,
+                        Math.min(distkvi / 2,
+                                 distkvi / (METASIZE + avgRec) * METASIZE)))
+                % kvbuffer.length;
+              setEquator(newPos);
+              bufmark = bufindex = newPos;
+              final int serBound = 4 * kvend;
+              // bytes remaining before the lock must be held and limits
+              // checked is the minimum of three arcs: the metadata space, the
+              // serialization space, and the soft limit
+              bufferRemaining = Math.min(
+                  // metadata max
+                  distanceTo(bufend, newPos),
+                  Math.min(
+                    // serialization max
+                    distanceTo(newPos, serBound),
+                    // soft limit
+                    softLimit)) - 2 * METASIZE;
+            }
+          }
+        } while (false);
+      } finally {
+        spillLock.unlock();
+      }
+    }
+
+    try {
+      // serialize key bytes into buffer
+      int keystart = bufindex;
+      keySerializer.serialize(key);
+      if (bufindex < keystart) {
+        // wrapped the key; must make contiguous
+        bb.shiftBufferedKey();
+        keystart = 0;
+      }
+      // serialize value bytes into buffer
+      final int valstart = bufindex;
+      valSerializer.serialize(value);
+      // It's possible for records to have zero length, i.e. the serializer
+      // will perform no writes. To ensure that the boundary conditions are
+      // checked and that the kvindex invariant is maintained, perform a
+      // zero-length write into the buffer. The logic monitoring this could be
+      // moved into collect, but this is cleaner and inexpensive. For now, it
+      // is acceptable.
+      bb.write(b0, 0, 0);
+
+      // the record must be marked after the preceding write, as the metadata
+      // for this record are not yet written
+      int valend = bb.markRecord();
+
+      mapOutputRecordCounter.increment(1);
+      mapOutputByteCounter.increment(
+          distanceTo(keystart, valend, bufvoid));
+
+      // write accounting info
+      kvmeta.put(kvindex + INDEX, kvindex);
+      kvmeta.put(kvindex + PARTITION, partition);
+      kvmeta.put(kvindex + KEYSTART, keystart);
+      kvmeta.put(kvindex + VALSTART, valstart);
+      // advance kvindex
+      kvindex = (kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity();
+    } catch (MapBufferTooSmallException e) {
+      LOG.info("Record too large for in-memory buffer: " + e.getMessage());
+      spillSingleRecord(key, value, partition);
+      mapOutputRecordCounter.increment(1);
+      return;
+    }
+  }
+
+  /**
+   * Set the point from which meta and serialization data expand. The meta
+   * indices are aligned with the buffer, so metadata never spans the ends of
+   * the circular buffer.
+   */
+  private void setEquator(int pos) {
+    equator = pos;
+    // set index prior to first entry, aligned at meta boundary
+    final int aligned = pos - (pos % METASIZE);
+    kvindex =
+      ((aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
+    if (LOG.isInfoEnabled()) {
+      LOG.info("(EQUATOR) " + pos + " kvi " + kvindex +
+          "(" + (kvindex * 4) + ")");
+    }
+  }
+
+  /**
+   * The spill is complete, so set the buffer and meta indices to be equal to
+   * the new equator to free space for continuing collection. Note that when
+   * kvindex == kvend == kvstart, the buffer is empty.
+   */
+  private void resetSpill() {
+    final int e = equator;
+    bufstart = bufend = e;
+    final int aligned = e - (e % METASIZE);
+    // set start/end to point to first meta record
+    kvstart = kvend =
+      ((aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
+    if (LOG.isInfoEnabled()) {
+      LOG.info("(RESET) equator " + e + " kv " + kvstart + "(" +
+        (kvstart * 4) + ")" + " kvi " + kvindex + "(" + (kvindex * 4) + ")");
+    }
+  }
+
+  /**
+   * Compute the distance in bytes between two indices in the serialization
+   * buffer.
+   * @see #distanceTo(int,int,int)
+   */
+  final int distanceTo(final int i, final int j) {
+    return distanceTo(i, j, kvbuffer.length);
+  }
+
+  /**
+   * Compute the distance between two indices in the circular buffer given the
+   * max distance.
+   */
+  int distanceTo(final int i, final int j, final int mod) {
+    return i <= j
+      ? j - i
+      : mod - i + j;
+  }
+
+  /**
+   * For the given meta position, return the dereferenced position in the
+   * integer array. Each meta block contains several integers describing
+   * record data in its serialized form, but the INDEX is not necessarily
+   * related to the proximate metadata. The index value at the referenced int
+   * position is the start offset of the associated metadata block. So the
+   * metadata INDEX at metapos may point to the metadata described by the
+   * metadata block at metapos + k, which contains information about that
+   * serialized record.
+   */
+  int offsetFor(int metapos) {
+    return kvmeta.get((metapos % maxRec) * NMETA + INDEX);
+  }
+
+  /**
+   * Compare logical range, st i, j MOD offset capacity.
+   * Compare by partition, then by key.
+   * @see IndexedSortable#compare
+   */
+  public int compare(final int mi, final int mj) {
+    final int kvi = offsetFor(mi);
+    final int kvj = offsetFor(mj);
+    final int kvip = kvmeta.get(kvi + PARTITION);
+    final int kvjp = kvmeta.get(kvj + PARTITION);
+    // sort by partition
+    if (kvip != kvjp) {
+      return kvip - kvjp;
+    }
+    // sort by key
+    return comparator.compare(kvbuffer,
+        kvmeta.get(kvi + KEYSTART),
+        kvmeta.get(kvi + VALSTART) - kvmeta.get(kvi + KEYSTART),
+        kvbuffer,
+        kvmeta.get(kvj + KEYSTART),
+        kvmeta.get(kvj + VALSTART) - kvmeta.get(kvj + KEYSTART));
+  }
+
+  /**
+   * Swap logical indices st i, j MOD offset capacity.
+   * @see IndexedSortable#swap
+   */
+  public void swap(final int mi, final int mj) {
+    final int kvi = (mi % maxRec) * NMETA + INDEX;
+    final int kvj = (mj % maxRec) * NMETA + INDEX;
+    int tmp = kvmeta.get(kvi);
+    kvmeta.put(kvi, kvmeta.get(kvj));
+    kvmeta.put(kvj, tmp);
+  }
+
+  /**
+   * Inner class managing the spill of serialized records to disk.
+   */
+  protected class BlockingBuffer extends DataOutputStream {
+
+    public BlockingBuffer() {
+      super(new Buffer());
+    }
+
+    /**
+     * Mark end of record. Note that this is required if the buffer is to
+     * cut the spill in the proper place.
+     */
+    public int markRecord() {
+      bufmark = bufindex;
+      return bufindex;
+    }
+
+    /**
+     * Set position from last mark to end of writable buffer, then rewrite
+     * the data between last mark and kvindex.
+     * This handles a special case where the key wraps around the buffer.
+     * If the key is to be passed to a RawComparator, then it must be
+     * contiguous in the buffer. This recopies the data in the buffer back
+     * into itself, but starting at the beginning of the buffer. Note that
+     * this method should <b>only</b> be called immediately after detecting
+     * this condition. To call it at any other time is undefined and would
+     * likely result in data loss or corruption.
+     * @see #markRecord()
+     */
+    protected void shiftBufferedKey() throws IOException {
+      // spillLock unnecessary; both kvend and kvindex are current
+      int headbytelen = bufvoid - bufmark;
+      bufvoid = bufmark;
+      final int kvbidx = 4 * kvindex;
+      final int kvbend = 4 * kvend;
+      final int avail =
+        Math.min(distanceTo(0, kvbidx), distanceTo(0, kvbend));
+      if (bufindex + headbytelen < avail) {
+        System.arraycopy(kvbuffer, 0, kvbuffer, headbytelen, bufindex);
+        System.arraycopy(kvbuffer, bufvoid, kvbuffer, 0, headbytelen);
+        bufindex += headbytelen;
+        bufferRemaining -= kvbuffer.length - bufvoid;
+      } else {
+        byte[] keytmp = new byte[bufindex];
+        System.arraycopy(kvbuffer, 0, keytmp, 0, bufindex);
+        bufindex = 0;
+        out.write(kvbuffer, bufmark, headbytelen);
+        out.write(keytmp);
+      }
+    }
+  }
+
+  public class Buffer extends OutputStream {
+    private final byte[] scratch = new byte[1];
+
+    @Override
+    public void write(int v)
+        throws IOException {
+      scratch[0] = (byte)v;
+      write(scratch, 0, 1);
+    }
+
+    /**
+     * Attempt to write a sequence of bytes to the collection buffer.
+     * This method will block if the spill thread is running and it
+     * cannot write.
+     * @throws MapBufferTooSmallException if record is too large to
+     *    deserialize into the collection buffer.
+     */
+    @Override
+    public void write(byte b[], int off, int len)
+        throws IOException {
+      // must always verify the invariant that at least METASIZE bytes are
+      // available beyond kvindex, even when len == 0
+      bufferRemaining -= len;
+      if (bufferRemaining <= 0) {
+        // writing these bytes could exhaust available buffer space or fill
+        // the buffer to soft limit. check if spill or blocking are necessary
+        boolean blockwrite = false;
+        spillLock.lock();
+        try {
+          do {
+            checkSpillException();
+
+            final int kvbidx = 4 * kvindex;
+            final int kvbend = 4 * kvend;
+            // ser distance to key index
+            final int distkvi = distanceTo(bufindex, kvbidx);
+            // ser distance to spill end index
+            final int distkve = distanceTo(bufindex, kvbend);
+
+            // if kvindex is closer than kvend, then a spill is neither in
+            // progress nor complete and reset since the lock was held. The
+            // write should block only if there is insufficient space to
+            // complete the current write, write the metadata for this record,
+            // and write the metadata for the next record. If kvend is closer,
+            // then the write should block if there is too little space for
+            // either the metadata or the current write. Note that collect
+            // ensures its metadata requirement with a zero-length write
+            blockwrite = distkvi <= distkve
+              ? distkvi <= len + 2 * METASIZE
+              : distkve <= len || distanceTo(bufend, kvbidx) < 2 * METASIZE;
+
+            if (!spillInProgress) {
+              if (blockwrite) {
+                if ((kvbend + METASIZE) % kvbuffer.length !=
+                    equator - (equator % METASIZE)) {
+                  // spill finished, reclaim space
+                  // need to use meta exclusively; zero-len rec & 100% spill
+                  // pcnt would fail
+                  resetSpill(); // resetSpill doesn't move bufindex, kvindex
+                  bufferRemaining = Math.min(
+                      distkvi - 2 * METASIZE,
+                      softLimit - distanceTo(kvbidx, bufindex)) - len;
+                  continue;
+                }
+                // we have records we can spill; only spill if blocked
+                if (kvindex != kvend) {
+                  startSpill();
+                  // Blocked on this write, waiting for the spill just
+                  // initiated to finish. Instead of repositioning the marker
+                  // and copying the partial record, we set the record start
+                  // to be the new equator
+                  setEquator(bufmark);
+                } else {
+                  // We have no buffered records, and this record is too large
+                  // to write into kvbuffer. We must spill it directly from
+                  // collect
+                  final int size = distanceTo(bufstart, bufindex) + len;
+                  setEquator(0);
+                  bufstart = bufend = bufindex = equator;
+                  kvstart = kvend = kvindex;
+                  bufvoid = kvbuffer.length;
+                  throw new MapBufferTooSmallException(size + " bytes");
+                }
+              }
+            }
+
+            if (blockwrite) {
+              // wait for spill
+              try {
+                while (spillInProgress) {
+                  spillDone.await();
+                }
+              } catch (InterruptedException e) {
+                  throw new IOException(
+                      "Buffer interrupted while waiting for the writer", e);
+              }
+            }
+          } while (blockwrite);
+        } finally {
+          spillLock.unlock();
+        }
+      }
+      // here, we know that we have sufficient space to write
+      if (bufindex + len > bufvoid) {
+        final int gaplen = bufvoid - bufindex;
+        System.arraycopy(b, off, kvbuffer, bufindex, gaplen);
+        len -= gaplen;
+        off += gaplen;
+        bufindex = 0;
+      }
+      System.arraycopy(b, off, kvbuffer, bufindex, len);
+      bufindex += len;
+    }
+  }
+
+  @Override
+  public void flush() throws IOException {
+    LOG.info("Starting flush of map output");
+    spillLock.lock();
+    try {
+      while (spillInProgress) {
+        spillDone.await();
+      }
+      checkSpillException();
+
+      final int kvbend = 4 * kvend;
+      if ((kvbend + METASIZE) % kvbuffer.length !=
+          equator - (equator % METASIZE)) {
+        // spill finished
+        resetSpill();
+      }
+      if (kvindex != kvend) {
+        kvend = (kvindex + NMETA) % kvmeta.capacity();
+        bufend = bufmark;
+        if (LOG.isInfoEnabled()) {
+          LOG.info("Sorting & Spilling map output");
+          LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
+                   "; bufvoid = " + bufvoid);
+          LOG.info("kvstart = " + kvstart + "(" + (kvstart * 4) +
+                   "); kvend = " + kvend + "(" + (kvend * 4) +
+                   "); length = " + (distanceTo(kvend, kvstart,
+                         kvmeta.capacity()) + 1) + "/" + maxRec);
+        }
+        sortAndSpill();
+      }
+    } catch (InterruptedException e) {
+      throw new IOException("Interrupted while waiting for the writer", e);
+    } finally {
+      spillLock.unlock();
+    }
+    assert !spillLock.isHeldByCurrentThread();
+    // shut down spill thread and wait for it to exit. Since the preceding
+    // ensures that it is finished with its work (and sortAndSpill did not
+    // throw), we elect to use an interrupt instead of setting a flag.
+    // Spilling simultaneously from this thread while the spill thread
+    // finishes its work might be both a useful way to extend this and also
+    // sufficient motivation for the latter approach.
+    try {
+      spillThread.interrupt();
+      spillThread.join();
+    } catch (InterruptedException e) {
+      throw new IOException("Spill failed", e);
+    }
+    // release sort buffer before the merge
+    //FIXME
+    //kvbuffer = null;
+    mergeParts();
+    Path outputPath = mapOutputFile.getOutputFile();
+    fileOutputByteCounter.increment(rfs.getFileStatus(outputPath).getLen());
+  }
+
+  @Override
+  public void close() throws IOException { }
+
+  protected class SpillThread extends Thread {
+
+    @Override
+    public void run() {
+      spillLock.lock();
+      spillThreadRunning = true;
+      try {
+        while (true) {
+          spillDone.signal();
+          while (!spillInProgress) {
+            spillReady.await();
+          }
+          try {
+            spillLock.unlock();
+            sortAndSpill();
+          } catch (Throwable t) {
+            LOG.warn("Got an exception in sortAndSpill", t);
+            sortSpillException = t;
+          } finally {
+            spillLock.lock();
+            if (bufend < bufstart) {
+              bufvoid = kvbuffer.length;
+            }
+            kvstart = kvend;
+            bufstart = bufend;
+            spillInProgress = false;
+          }
+        }
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      } finally {
+        spillLock.unlock();
+        spillThreadRunning = false;
+      }
+    }
+  }
+
+  private void checkSpillException() throws IOException {
+    final Throwable lspillException = sortSpillException;
+    if (lspillException != null) {
+      if (lspillException instanceof Error) {
+        final String logMsg = "Task " + outputContext.getUniqueIdentifier()
+            + " failed : " + StringUtils.stringifyException(lspillException);
+        outputContext.fatalError(lspillException, logMsg);
+      }
+      throw new IOException("Spill failed", lspillException);
+    }
+  }
+
+  private void startSpill() {
+    assert !spillInProgress;
+    kvend = (kvindex + NMETA) % kvmeta.capacity();
+    bufend = bufmark;
+    spillInProgress = true;
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Spilling map output");
+      LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
+               "; bufvoid = " + bufvoid);
+      LOG.info("kvstart = " + kvstart + "(" + (kvstart * 4) +
+               "); kvend = " + kvend + "(" + (kvend * 4) +
+               "); length = " + (distanceTo(kvend, kvstart,
+                     kvmeta.capacity()) + 1) + "/" + maxRec);
+    }
+    spillReady.signal();
+  }
+
+  int getMetaStart() {
+    return kvend / NMETA;
+  }
+
+  int getMetaEnd() {
+    return 1 + // kvend is a valid record
+        (kvstart >= kvend
+        ? kvstart
+        : kvmeta.capacity() + kvstart) / NMETA;
+  }
+
+  protected void sortAndSpill()
+      throws IOException, InterruptedException {
+    final int mstart = getMetaStart();
+    final int mend = getMetaEnd();
+    sorter.sort(this, mstart, mend, nullProgressable);
+    spill(mstart, mend);
+  }
+
+  protected void spill(int mstart, int mend)
+      throws IOException, InterruptedException {
+
+    //approximate the length of the output file to be the length of the
+    //buffer + header lengths for the partitions
+    final long size = (bufend >= bufstart
+        ? bufend - bufstart
+        : (bufvoid - bufend) + bufstart) +
+                partitions * APPROX_HEADER_LENGTH;
+    FSDataOutputStream out = null;
+    try {
+      // create spill file
+      final TezSpillRecord spillRec = new TezSpillRecord(partitions);
+      final Path filename =
+          mapOutputFile.getSpillFileForWrite(numSpills, size);
+      out = rfs.create(filename);
+
+      int spindex = mstart;
+      final InMemValBytes value = createInMemValBytes();
+      for (int i = 0; i < partitions; ++i) {
+        IFile.Writer writer = null;
+        try {
+          long segmentStart = out.getPos();
+          writer = new Writer(conf, out, keyClass, valClass, codec,
+                                    spilledRecordsCounter);
+          if (combiner == null) {
+            // spill directly
+            DataInputBuffer key = new DataInputBuffer();
+            while (spindex < mend &&
+                kvmeta.get(offsetFor(spindex) + PARTITION) == i) {
+              final int kvoff = offsetFor(spindex);
+              key.reset(
+                  kvbuffer,
+                  kvmeta.get(kvoff + KEYSTART),
+                  (kvmeta.get(kvoff + VALSTART) - kvmeta.get(kvoff + KEYSTART))
+                  );
+              getVBytesForOffset(kvoff, value);
+              writer.append(key, value);
+              ++spindex;
+            }
+          } else {
+            int spstart = spindex;
+            while (spindex < mend &&
+                kvmeta.get(offsetFor(spindex)
+                          + PARTITION) == i) {
+              ++spindex;
+            }
+            // Note: we would like to avoid the combiner if we've fewer
+            // than some threshold of records for a partition
+            if (spstart != spindex) {
+              TezRawKeyValueIterator kvIter =
+                new MRResultIterator(spstart, spindex);
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Running combine processor");
+              }
+              runCombineProcessor(kvIter, writer);
+            }
+          }
+
+          // close the writer
+          writer.close();
+
+          // record offsets
+          final TezIndexRecord rec =
+              new TezIndexRecord(
+                  segmentStart,
+                  writer.getRawLength(),
+                  writer.getCompressedLength());
+          spillRec.putIndex(rec, i);
+
+          writer = null;
+        } finally {
+          if (null != writer) writer.close();
+        }
+      }
+
+      if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
+        // create spill index file
+        Path indexFilename =
+            mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
+                * MAP_OUTPUT_INDEX_RECORD_LENGTH);
+        spillRec.writeToFile(indexFilename, conf);
+      } else {
+        indexCacheList.add(spillRec);
+        totalIndexCacheMemory +=
+          spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
+      }
+      LOG.info("Finished spill " + numSpills);
+      ++numSpills;
+    } finally {
+      if (out != null) out.close();
+    }
+  }
+
+  /**
+   * Handles the degenerate case where serialization fails to fit in
+   * the in-memory buffer, so we must spill the record from collect
+   * directly to a spill file. Consider this "losing".
+   */
+  private void spillSingleRecord(final Object key, final Object value,
+                                 int partition) throws IOException {
+    long size = kvbuffer.length + partitions * APPROX_HEADER_LENGTH;
+    FSDataOutputStream out = null;
+    try {
+      // create spill file
+      final TezSpillRecord spillRec = new TezSpillRecord(partitions);
+      final Path filename =
+          mapOutputFile.getSpillFileForWrite(numSpills, size);
+      out = rfs.create(filename);
+
+      // we don't run the combiner for a single record
+      for (int i = 0; i < partitions; ++i) {
+        IFile.Writer writer = null;
+        try {
+          long segmentStart = out.getPos();
+          // Create a new codec, don't care!
+          writer = new IFile.Writer(conf, out, keyClass, valClass, codec,
+                                          spilledRecordsCounter);
+
+          if (i == partition) {
+            final long recordStart = out.getPos();
+            writer.append(key, value);
+            // Note that our map byte count will not be accurate with
+            // compression
+            mapOutputByteCounter.increment(out.getPos() - recordStart);
+          }
+          writer.close();
+
+          // record offsets
+          TezIndexRecord rec =
+              new TezIndexRecord(
+                  segmentStart,
+                  writer.getRawLength(),
+                  writer.getCompressedLength());
+          spillRec.putIndex(rec, i);
+
+          writer = null;
+        } catch (IOException e) {
+          if (null != writer) writer.close();
+          throw e;
+        }
+      }
+      if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
+        // create spill index file
+        Path indexFilename =
+            mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
+                * MAP_OUTPUT_INDEX_RECORD_LENGTH);
+        spillRec.writeToFile(indexFilename, conf);
+      } else {
+        indexCacheList.add(spillRec);
+        totalIndexCacheMemory +=
+          spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
+      }
+      ++numSpills;
+    } finally {
+      if (out != null) out.close();
+    }
+  }
+
+  protected int getInMemVBytesLength(int kvoff) {
+    // get the keystart for the next serialized value to be the end
+    // of this value. If this is the last value in the buffer, use bufend
+    final int nextindex = kvoff == kvend
+      ? bufend
+      : kvmeta.get(
+          (kvoff - NMETA + kvmeta.capacity() + KEYSTART) % kvmeta.capacity());
+    // calculate the length of the value
+    int vallen = (nextindex >= kvmeta.get(kvoff + VALSTART))
+      ? nextindex - kvmeta.get(kvoff + VALSTART)
+      : (bufvoid - kvmeta.get(kvoff + VALSTART)) + nextindex;
+      return vallen;
+  }
+
+  /**
+   * Given an offset, populate vbytes with the associated set of
+   * deserialized value bytes. Should only be called during a spill.
+   */
+  int getVBytesForOffset(int kvoff, InMemValBytes vbytes) {
+    int vallen = getInMemVBytesLength(kvoff);
+    vbytes.reset(kvbuffer, kvmeta.get(kvoff + VALSTART), vallen);
+    return vallen;
+  }
+
+  /**
+   * Inner class wrapping valuebytes, used for appendRaw.
+   */
+  static class InMemValBytes extends DataInputBuffer {
+    private byte[] buffer;
+    private int start;
+    private int length;
+    private final int bufvoid;
+
+    public InMemValBytes(int bufvoid) {
+      this.bufvoid = bufvoid;
+    }
+
+    public void reset(byte[] buffer, int start, int length) {
+      this.buffer = buffer;
+      this.start = start;
+      this.length = length;
+
+      if (start + length > bufvoid) {
+        this.buffer = new byte[this.length];
+        final int taillen = bufvoid - start;
+        System.arraycopy(buffer, start, this.buffer, 0, taillen);
+        System.arraycopy(buffer, 0, this.buffer, taillen, length-taillen);
+        this.start = 0;
+      }
+
+      super.reset(this.buffer, this.start, this.length);
+    }
+  }
+
+  InMemValBytes createInMemValBytes() {
+    return new InMemValBytes(bufvoid);
+  }
+
+  protected class MRResultIterator implements TezRawKeyValueIterator {
+    private final DataInputBuffer keybuf = new DataInputBuffer();
+    private final InMemValBytes vbytes = createInMemValBytes();
+    private final int end;
+    private int current;
+    public MRResultIterator(int start, int end) {
+      this.end = end;
+      current = start - 1;
+    }
+    public boolean next() throws IOException {
+      return ++current < end;
+    }
+    public DataInputBuffer getKey() throws IOException {
+      final int kvoff = offsetFor(current);
+      keybuf.reset(kvbuffer, kvmeta.get(kvoff + KEYSTART),
+          kvmeta.get(kvoff + VALSTART) - kvmeta.get(kvoff + KEYSTART));
+      return keybuf;
+    }
+    public DataInputBuffer getValue() throws IOException {
+      getVBytesForOffset(offsetFor(current), vbytes);
+      return vbytes;
+    }
+    public Progress getProgress() {
+      return null;
+    }
+    public void close() { }
+  }
+
+  private void mergeParts() throws IOException {
+    // get the approximate size of the final output/index files
+    long finalOutFileSize = 0;
+    long finalIndexFileSize = 0;
+    final Path[] filename = new Path[numSpills];
+    final String taskIdentifier = outputContext.getUniqueIdentifier();
+
+    for(int i = 0; i < numSpills; i++) {
+      filename[i] = mapOutputFile.getSpillFile(i);
+      finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
+    }
+    if (numSpills == 1) { //the spill is the final output
+      sameVolRename(filename[0],
+          mapOutputFile.getOutputFileForWriteInVolume(filename[0]));
+      if (indexCacheList.size() == 0) {
+        sameVolRename(mapOutputFile.getSpillIndexFile(0),
+          mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]));
+      } else {
+        indexCacheList.get(0).writeToFile(
+          mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]), conf);
+      }
+      return;
+    }
+
+    // read in paged indices
+    for (int i = indexCacheList.size(); i < numSpills; ++i) {
+      Path indexFileName = mapOutputFile.getSpillIndexFile(i);
+      indexCacheList.add(new TezSpillRecord(indexFileName, conf));
+    }
+
+    //make correction in the length to include the sequence file header
+    //lengths for each partition
+    finalOutFileSize += partitions * APPROX_HEADER_LENGTH;
+    finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
+    Path finalOutputFile =
+        mapOutputFile.getOutputFileForWrite(finalOutFileSize);
+    Path finalIndexFile =
+        mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize);
+
+    //The output stream for the final single output file
+    FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
+
+    if (numSpills == 0) {
+      //create dummy files
+
+      TezSpillRecord sr = new TezSpillRecord(partitions);
+      try {
+        for (int i = 0; i < partitions; i++) {
+          long segmentStart = finalOut.getPos();
+          Writer writer =
+            new Writer(conf, finalOut, keyClass, valClass, codec, null);
+          writer.close();
+
+          TezIndexRecord rec =
+              new TezIndexRecord(
+                  segmentStart,
+                  writer.getRawLength(),
+                  writer.getCompressedLength());
+          sr.putIndex(rec, i);
+        }
+        sr.writeToFile(finalIndexFile, conf);
+      } finally {
+        finalOut.close();
+      }
+      return;
+    }
+    else {
+      TezMerger.considerFinalMergeForProgress();
+
+      final TezSpillRecord spillRec = new TezSpillRecord(partitions);
+      for (int parts = 0; parts < partitions; parts++) {
+        //create the segments to be merged
+        List<Segment> segmentList =
+          new ArrayList<Segment>(numSpills);
+        for(int i = 0; i < numSpills; i++) {
+          TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
+
+          Segment s =
+            new Segment(conf, rfs, filename[i], indexRecord.getStartOffset(),
+                             indexRecord.getPartLength(), codec, true);
+          segmentList.add(i, s);
+
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("TaskIdentifier=" + taskIdentifier + " Partition=" + parts +
+                "Spill =" + i + "(" + indexRecord.getStartOffset() + "," +
+                indexRecord.getRawLength() + ", " +
+                indexRecord.getPartLength() + ")");
+          }
+        }
+
+        int mergeFactor =
+            this.conf.getInt(TezJobConfig.TEZ_RUNTIME_IO_SORT_FACTOR,
+                TezJobConfig.DEFAULT_TEZ_RUNTIME_IO_SORT_FACTOR);
+        // sort the segments only if there are intermediate merges
+        boolean sortSegments = segmentList.size() > mergeFactor;
+        //merge
+        TezRawKeyValueIterator kvIter = TezMerger.merge(conf, rfs,
+                       keyClass, valClass, codec,
+                       segmentList, mergeFactor,
+                       new Path(taskIdentifier),
+                       (RawComparator)ConfigUtils.getIntermediateOutputKeyComparator(conf),
+                       nullProgressable, sortSegments,
+                       null, spilledRecordsCounter,
+                       null); // Not using any Progress in TezMerger. Should just work.
+
+        //write merged output to disk
+        long segmentStart = finalOut.getPos();
+        Writer writer =
+            new Writer(conf, finalOut, keyClass, valClass, codec,
+                spilledRecordsCounter);
+        if (combiner == null || numSpills < minSpillsForCombine) {
+          TezMerger.writeFile(kvIter, writer,
+              nullProgressable, conf);
+        } else {
+          runCombineProcessor(kvIter, writer);
+        }
+        writer.close();
+
+        // record offsets
+        final TezIndexRecord rec =
+            new TezIndexRecord(
+                segmentStart,
+                writer.getRawLength(),
+                writer.getCompressedLength());
+        spillRec.putIndex(rec, parts);
+      }
+      spillRec.writeToFile(finalIndexFile, conf);
+      finalOut.close();
+      for(int i = 0; i < numSpills; i++) {
+        rfs.delete(filename[i],true);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/InMemoryShuffleSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/InMemoryShuffleSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/InMemoryShuffleSorter.java
new file mode 100644
index 0000000..92ae916
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/InMemoryShuffleSorter.java
@@ -0,0 +1,126 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.runtime.library.common.sort.impl.dflt;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.IntBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.tez.runtime.api.TezOutputContext;
+import org.apache.tez.runtime.library.common.shuffle.impl.ShuffleHeader;
+import org.apache.tez.runtime.library.common.shuffle.server.ShuffleHandler;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+
+public class InMemoryShuffleSorter extends DefaultSorter {
+
+  private static final Log LOG = LogFactory.getLog(InMemoryShuffleSorter.class);
+  
+  static final int IFILE_EOF_LENGTH = 
+      2 * WritableUtils.getVIntSize(IFile.EOF_MARKER);
+  static final int IFILE_CHECKSUM_LENGTH = DataChecksum.Type.CRC32.size;
+  
+  private List<Integer> spillIndices = new ArrayList<Integer>();
+  private List<ShuffleHeader> shuffleHeaders = new ArrayList<ShuffleHeader>();
+
+  ShuffleHandler shuffleHandler = new ShuffleHandler(this);
+  
+  byte[] kvbuffer;
+  IntBuffer kvmeta;
+
+  @Override
+  public void initialize(TezOutputContext outputContext, Configuration conf, int numOutputs) throws IOException {
+    super.initialize(outputContext, conf, numOutputs);
+    shuffleHandler.initialize(outputContext, conf);
+  }
+
+  @Override
+  protected void spill(int mstart, int mend) 
+      throws IOException, InterruptedException {
+    // Start the shuffleHandler
+    shuffleHandler.start();
+
+    // Don't spill!
+    
+    // Make a copy
+    this.kvbuffer = super.kvbuffer;
+    this.kvmeta = super.kvmeta;
+
+    // Just save spill-indices for serving later
+    int spindex = mstart;
+    for (int i = 0; i < partitions; ++i) {
+      spillIndices.add(spindex);
+      
+      int length = 0;
+      while (spindex < mend &&
+          kvmeta.get(offsetFor(spindex) + PARTITION) == i) {
+
+        final int kvoff = offsetFor(spindex);
+        int keyLen = 
+            kvmeta.get(kvoff + VALSTART) - kvmeta.get(kvoff + KEYSTART);
+        int valLen = getInMemVBytesLength(kvoff);
+        length += 
+            (keyLen + WritableUtils.getVIntSize(keyLen)) + 
+            (valLen + WritableUtils.getVIntSize(valLen));
+
+        ++spindex;
+      }
+      length += IFILE_EOF_LENGTH;
+      
+      shuffleHeaders.add( 
+          new ShuffleHeader(
+              outputContext.getUniqueIdentifier(), // TODO Verify that this is correct. 
+              length + IFILE_CHECKSUM_LENGTH, length, i)
+          );
+      LOG.info("shuffleHeader[" + i + "]:" +
+      		" rawLen=" + length + " partLen=" + (length + IFILE_CHECKSUM_LENGTH) + 
+          " spillIndex=" + spillIndices.get(i));
+    }
+    
+    LOG.info("Saved " + spillIndices.size() + " spill-indices and " + 
+        shuffleHeaders.size() + " shuffle headers");
+  }
+
+  @Override
+  public InputStream getSortedStream(int partition) {
+    return new SortBufferInputStream(this, partition);
+  }
+
+  @Override
+  public void close() throws IOException {
+    // FIXME
+    //shuffleHandler.stop();
+  }
+
+  @Override
+  public ShuffleHeader getShuffleHeader(int reduce) {
+    return shuffleHeaders.get(reduce);
+  }
+
+  public int getSpillIndex(int partition) {
+    return spillIndices.get(partition);
+  }
+
+}