You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/25 09:31:47 UTC

[40/50] [abbrv] Rename tez-engine-api to tez-runtime-api and tez-engine is split into 2: - tez-engine-library for user-visible Input/Output/Processor implementations - tez-engine-internals for framework internals

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java
deleted file mode 100644
index 1bf17a3..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java
+++ /dev/null
@@ -1,932 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.tez.engine.common.sort.impl;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.BufferOverflowException;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.nio.IntBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.PriorityQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.HashComparator;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.util.IndexedSortable;
-import org.apache.hadoop.util.IndexedSorter;
-import org.apache.hadoop.util.Progress;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.engine.api.TezOutputContext;
-import org.apache.tez.engine.common.ConfigUtils;
-import org.apache.tez.engine.common.sort.impl.IFile.Writer;
-import org.apache.tez.engine.common.sort.impl.TezMerger.Segment;
-
-@SuppressWarnings({"unchecked", "rawtypes"})
-public class PipelinedSorter extends ExternalSorter {
-  
-  private static final Log LOG = LogFactory.getLog(PipelinedSorter.class);
-  
-  /**
-   * The size of each record in the index file for the map-outputs.
-   */
-  public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
-
-  private final static int APPROX_HEADER_LENGTH = 150;
-    
-  int partitionBits;
-  
-  private static final int PARTITION = 0;        // partition offset in acct
-  private static final int KEYSTART = 1;         // key offset in acct
-  private static final int VALSTART = 2;         // val offset in acct
-  private static final int VALLEN = 3;           // val len in acct
-  private static final int NMETA = 4;            // num meta ints
-  private static final int METASIZE = NMETA * 4; // size in bytes
-
-  // spill accounting
-  volatile Throwable sortSpillException = null;
-
-  int numSpills = 0;
-  int minSpillsForCombine;
-  private HashComparator hasher;
-  // SortSpans  
-  private SortSpan span;
-  private ByteBuffer largeBuffer;
-  // Merger
-  private SpanMerger merger; 
-  private ExecutorService sortmaster;
-
-  final ArrayList<TezSpillRecord> indexCacheList =
-    new ArrayList<TezSpillRecord>();
-  private int totalIndexCacheMemory;
-  private int indexCacheMemoryLimit;
-
-  public void initialize(TezOutputContext outputContext, Configuration conf, int numOutputs) throws IOException {
-    super.initialize(outputContext, conf, numOutputs);
-    
-    partitionBits = bitcount(partitions)+1;
-   
-    //sanity checks
-    final float spillper =
-      this.conf.getFloat(
-          TezJobConfig.TEZ_ENGINE_SORT_SPILL_PERCENT, 
-          TezJobConfig.DEFAULT_TEZ_ENGINE_SORT_SPILL_PERCENT);
-    final int sortmb = 
-        this.conf.getInt(
-            TezJobConfig.TEZ_ENGINE_IO_SORT_MB, 
-            TezJobConfig.DEFAULT_TEZ_ENGINE_IO_SORT_MB);
-    indexCacheMemoryLimit = this.conf.getInt(TezJobConfig.TEZ_ENGINE_INDEX_CACHE_MEMORY_LIMIT_BYTES,
-                                       TezJobConfig.DEFAULT_TEZ_ENGINE_INDEX_CACHE_MEMORY_LIMIT_BYTES);
-    if (spillper > (float)1.0 || spillper <= (float)0.0) {
-      throw new IOException("Invalid \"" + TezJobConfig.TEZ_ENGINE_SORT_SPILL_PERCENT +
-          "\": " + spillper);
-    }
-    if ((sortmb & 0x7FF) != sortmb) {
-      throw new IOException(
-          "Invalid \"" + TezJobConfig.TEZ_ENGINE_IO_SORT_MB + "\": " + sortmb);
-    }
-    
-    // buffers and accounting
-    int maxMemUsage = sortmb << 20;
-    maxMemUsage -= maxMemUsage % METASIZE;
-    largeBuffer = ByteBuffer.allocate(maxMemUsage);
-    LOG.info(TezJobConfig.TEZ_ENGINE_IO_SORT_MB + " = " + sortmb);
-    // TODO: configurable setting?
-    span = new SortSpan(largeBuffer, 1024*1024, 16);
-    merger = new SpanMerger(comparator);
-    final int sortThreads = 
-            this.conf.getInt(
-                TezJobConfig.TEZ_ENGINE_SORT_THREADS, 
-                TezJobConfig.DEFAULT_TEZ_ENGINE_SORT_THREADS);
-    sortmaster = Executors.newFixedThreadPool(sortThreads);
-
-    // k/v serialization    
-    if(comparator instanceof HashComparator) {
-      hasher = (HashComparator)comparator;
-      LOG.info("Using the HashComparator");
-    } else {
-      hasher = null;
-    }    
-    valSerializer.open(span.out);
-    keySerializer.open(span.out);
-    minSpillsForCombine = this.conf.getInt(TezJobConfig.TEZ_ENGINE_COMBINE_MIN_SPILLS, 3);
-  }
-
-  private int bitcount(int n) {
-    int bit = 0;
-    while(n!=0) {
-      bit++;
-      n >>= 1;
-    }
-    return bit;
-  }
-  
-  public void sort() throws IOException {
-    SortSpan newSpan = span.next();
-
-    if(newSpan == null) {
-      // sort in the same thread, do not wait for the thread pool
-      merger.add(span.sort(sorter, comparator));
-      spill();
-      int items = 1024*1024;
-      int perItem = 16;
-      if(span.length() != 0) {
-        items = span.length();
-        perItem = span.kvbuffer.limit()/items;
-        items = (largeBuffer.capacity())/(METASIZE+perItem);
-        if(items > 1024*1024) {
-            // our goal is to have 1M splits and sort early
-            items = 1024*1024;
-        }
-      }      
-      span = new SortSpan(largeBuffer, items, perItem);
-    } else {
-      // queue up the sort
-      SortTask task = new SortTask(span, sorter, comparator);
-      Future<SpanIterator> future = sortmaster.submit(task);
-      merger.add(future);
-      span = newSpan;
-    }
-    valSerializer.open(span.out);
-    keySerializer.open(span.out);
-  }
-
-  @Override
-  public void write(Object key, Object value) 
-      throws IOException {
-    collect(
-        key, value, partitioner.getPartition(key, value, partitions));
-  }
-
-  /**
-   * Serialize the key, value to intermediate storage.
-   * When this method returns, kvindex must refer to sufficient unused
-   * storage to store one METADATA.
-   */
-  synchronized void collect(Object key, Object value, final int partition
-                                   ) throws IOException {
-    if (key.getClass() != keyClass) {
-      throw new IOException("Type mismatch in key from map: expected "
-                            + keyClass.getName() + ", received "
-                            + key.getClass().getName());
-    }
-    if (value.getClass() != valClass) {
-      throw new IOException("Type mismatch in value from map: expected "
-                            + valClass.getName() + ", received "
-                            + value.getClass().getName());
-    }
-    if (partition < 0 || partition >= partitions) {
-      throw new IOException("Illegal partition for " + key + " (" +
-          partition + ")");
-    }
-    if(span.kvmeta.remaining() < METASIZE) {
-      this.sort();
-    }
-    int keystart = span.kvbuffer.position();
-    int valstart = -1;
-    int valend = -1;
-    try { 
-      keySerializer.serialize(key);
-      valstart = span.kvbuffer.position();      
-      valSerializer.serialize(value);
-      valend = span.kvbuffer.position();
-    } catch(BufferOverflowException overflow) {
-      // restore limit
-      span.kvbuffer.position(keystart);
-      this.sort();
-      // try again
-      this.collect(key, value, partition);
-      return;
-    }
-
-    int prefix = 0;
-
-    if(hasher != null) {
-      prefix = hasher.getHashCode(key);
-    }
-
-    prefix = (partition << (32 - partitionBits)) | (prefix >>> partitionBits);
-
-    /* maintain order as in PARTITION, KEYSTART, VALSTART, VALLEN */
-    span.kvmeta.put(prefix);
-    span.kvmeta.put(keystart);
-    span.kvmeta.put(valstart);
-    span.kvmeta.put(valend - valstart);
-    if((valstart - keystart) > span.keymax) {
-      span.keymax = (valstart - keystart);
-    }
-    if((valend - valstart) > span.valmax) {
-      span.valmax = (valend - valstart);
-    }
-    mapOutputRecordCounter.increment(1);
-    mapOutputByteCounter.increment(valend - keystart);
-  }
-
-  public void spill() throws IOException { 
-    // create spill file
-    final long size = largeBuffer.capacity() + 
-      (partitions * APPROX_HEADER_LENGTH);
-    final TezSpillRecord spillRec = new TezSpillRecord(partitions);
-    final Path filename =
-      mapOutputFile.getSpillFileForWrite(numSpills, size);    
-    FSDataOutputStream out = rfs.create(filename, true, 4096);
-
-    try {
-      merger.ready(); // wait for all the future results from sort threads
-      LOG.info("Spilling to " + filename.toString());
-      for (int i = 0; i < partitions; ++i) {
-        TezRawKeyValueIterator kvIter = merger.filter(i);
-        //write merged output to disk
-        long segmentStart = out.getPos();
-        Writer writer =
-          new Writer(conf, out, keyClass, valClass, codec,
-              spilledRecordsCounter);
-        writer.setRLE(merger.needsRLE());
-        if (combiner == null) {
-          while(kvIter.next()) {
-            writer.append(kvIter.getKey(), kvIter.getValue());
-          }
-        } else {          
-          runCombineProcessor(kvIter, writer);
-        }
-        //close
-        writer.close();
-
-        // record offsets
-        final TezIndexRecord rec = 
-            new TezIndexRecord(
-                segmentStart, 
-                writer.getRawLength(), 
-                writer.getCompressedLength());
-        spillRec.putIndex(rec, i);
-      }
-
-      Path indexFilename =
-        mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
-            * MAP_OUTPUT_INDEX_RECORD_LENGTH);
-      // TODO: cache
-      spillRec.writeToFile(indexFilename, conf);
-      ++numSpills;
-    } catch(InterruptedException ie) {
-      // TODO:the combiner has been interrupted
-    } finally {
-      out.close();
-    }
-  }
-
-  @Override
-  public void flush() throws IOException {
-    final String uniqueIdentifier = outputContext.getUniqueIdentifier();
-    Path finalOutputFile =
-        mapOutputFile.getOutputFileForWrite(0); //TODO
-    Path finalIndexFile =
-        mapOutputFile.getOutputIndexFileForWrite(0); //TODO
-
-    LOG.info("Starting flush of map output");
-    span.end();
-    merger.add(span.sort(sorter, comparator));
-    spill();
-    sortmaster.shutdown();
-
-    largeBuffer = null;
-
-    if(numSpills == 1) {
-      // someday be able to pass this directly to shuffle
-      // without writing to disk
-      final Path filename =
-          mapOutputFile.getSpillFile(0);
-      Path indexFilename =
-              mapOutputFile.getSpillIndexFile(0);
-      sameVolRename(filename, finalOutputFile);
-      sameVolRename(indexFilename, finalIndexFile);
-      return;
-    }
-    
-    //The output stream for the final single output file
-    FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
-
-    TezMerger.considerFinalMergeForProgress();
-
-    final TezSpillRecord spillRec = new TezSpillRecord(partitions);
-    final ArrayList<TezSpillRecord> indexCacheList = new ArrayList<TezSpillRecord>();
-
-    for(int i = 0; i < numSpills; i++) {
-      // TODO: build this cache before
-      Path indexFilename = mapOutputFile.getSpillIndexFile(i);
-      TezSpillRecord spillIndex = new TezSpillRecord(indexFilename, conf);
-      indexCacheList.add(spillIndex);
-    }
-    
-    for (int parts = 0; parts < partitions; parts++) {
-      //create the segments to be merged
-      List<Segment> segmentList =
-          new ArrayList<Segment>(numSpills);
-      for(int i = 0; i < numSpills; i++) {
-        Path spillFilename = mapOutputFile.getSpillFile(i);
-        TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
-
-        Segment s =
-            new Segment(conf, rfs, spillFilename, indexRecord.getStartOffset(),
-                             indexRecord.getPartLength(), codec, true);
-        segmentList.add(i, s);
-      }
-
-      int mergeFactor = 
-              this.conf.getInt(TezJobConfig.TEZ_ENGINE_IO_SORT_FACTOR, 
-                  TezJobConfig.DEFAULT_TEZ_ENGINE_IO_SORT_FACTOR);
-      // sort the segments only if there are intermediate merges
-      boolean sortSegments = segmentList.size() > mergeFactor;
-      //merge
-      TezRawKeyValueIterator kvIter = TezMerger.merge(conf, rfs,
-                     keyClass, valClass, codec,
-                     segmentList, mergeFactor,
-                     new Path(uniqueIdentifier),
-                     (RawComparator)ConfigUtils.getIntermediateOutputKeyComparator(conf), 
-                     nullProgressable, sortSegments,
-                     null, spilledRecordsCounter,
-                     null); // Not using any Progress in TezMerger. Should just work.
-
-      //write merged output to disk
-      long segmentStart = finalOut.getPos();
-      Writer writer =
-          new Writer(conf, finalOut, keyClass, valClass, codec,
-                           spilledRecordsCounter);
-      writer.setRLE(merger.needsRLE());
-      if (combiner == null || numSpills < minSpillsForCombine) {
-        TezMerger.writeFile(kvIter, writer, nullProgressable, conf);
-      } else {
-        runCombineProcessor(kvIter, writer);
-      }
-
-      //close
-      writer.close();
-
-      // record offsets
-      final TezIndexRecord rec = 
-          new TezIndexRecord(
-              segmentStart, 
-              writer.getRawLength(), 
-              writer.getCompressedLength());
-      spillRec.putIndex(rec, parts);
-    }
-
-    spillRec.writeToFile(finalIndexFile, conf);
-    finalOut.close();
-    for(int i = 0; i < numSpills; i++) {
-      Path indexFilename = mapOutputFile.getSpillIndexFile(i);
-      Path spillFilename = mapOutputFile.getSpillFile(i);
-      rfs.delete(indexFilename,true);
-      rfs.delete(spillFilename,true);
-    }
-  }
-
-  public void close() { }
-
-  private interface PartitionedRawKeyValueIterator extends TezRawKeyValueIterator {
-    int getPartition();
-  }
-
-  private class BufferStreamWrapper extends OutputStream 
-  {
-    private final ByteBuffer out;
-    public BufferStreamWrapper(ByteBuffer out) {
-      this.out = out;
-    }
-    
-    @Override
-    public void write(int b) throws IOException { out.put((byte)b); }
-    @Override
-    public void write(byte[] b) throws IOException { out.put(b); }
-    @Override
-    public void write(byte[] b, int off, int len) throws IOException { out.put(b, off, len); }
-  }
-
-  protected class InputByteBuffer extends DataInputBuffer {
-    private byte[] buffer = new byte[256]; 
-    private ByteBuffer wrapped = ByteBuffer.wrap(buffer);
-    private void resize(int length) {
-      if(length > buffer.length) {
-        buffer = new byte[length];
-        wrapped = ByteBuffer.wrap(buffer);
-      }
-      wrapped.limit(length);
-    }
-    public void reset(ByteBuffer b, int start, int length) {
-      resize(length);
-      b.position(start);
-      b.get(buffer, 0, length);
-      super.reset(buffer, 0, length);
-    }
-    // clone-ish function
-    public void reset(DataInputBuffer clone) {
-      byte[] data = clone.getData();
-      int start = clone.getPosition();
-      int length = clone.getLength();
-      resize(length);
-      System.arraycopy(data, start, buffer, 0, length);
-      super.reset(buffer, 0, length);
-    }
-  }
-
-  private class SortSpan  implements IndexedSortable {
-    final IntBuffer kvmeta;
-    final ByteBuffer kvbuffer;
-    final DataOutputStream out;    
-    private RawComparator comparator; 
-    final int imeta[] = new int[NMETA];
-    final int jmeta[] = new int[NMETA];
-    int keymax = 1;
-    int valmax = 1;
-    private int i,j;
-    private byte[] ki;
-    private byte[] kj;
-    private int index = 0;
-    private InputByteBuffer hay = new InputByteBuffer();
-    private long eq = 0;
-
-    public SortSpan(ByteBuffer source, int maxItems, int perItem) {
-      int capacity = source.remaining(); 
-      int metasize = METASIZE*maxItems;
-      int dataSize = maxItems * perItem;
-      if(capacity < (metasize+dataSize)) {
-        // try to allocate less meta space, because we have sample data
-        metasize = METASIZE*(capacity/(perItem+METASIZE));
-      }
-      ByteBuffer reserved = source.duplicate();
-      reserved.mark();
-      LOG.info("reserved.remaining() = "+reserved.remaining());
-      LOG.info("reserved.size = "+metasize);
-      reserved.position(metasize);
-      kvbuffer = reserved.slice();
-      reserved.flip();
-      reserved.limit(metasize);
-      kvmeta = reserved
-                .slice()
-                .order(ByteOrder.nativeOrder())
-               .asIntBuffer();
-      out = new DataOutputStream(
-              new BufferStreamWrapper(kvbuffer));
-    }
-
-    public SpanIterator sort(IndexedSorter sorter, RawComparator comparator) {
-    	this.comparator = comparator;
-      ki = new byte[keymax];
-      kj = new byte[keymax];
-      LOG.info("begin sorting Span"+index + " ("+length()+")");
-      if(length() > 1) {
-        sorter.sort(this, 0, length(), nullProgressable);
-      }
-      LOG.info("done sorting Span"+index);
-      return new SpanIterator(this);
-    }
-
-    int offsetFor(int i) {
-      return (i * NMETA);
-    }
-
-    public void swap(final int mi, final int mj) {
-      final int kvi = offsetFor(mi);
-      final int kvj = offsetFor(mj);
-
-      kvmeta.position(kvi); kvmeta.get(imeta);
-      kvmeta.position(kvj); kvmeta.get(jmeta);
-      kvmeta.position(kvj); kvmeta.put(imeta);
-      kvmeta.position(kvi); kvmeta.put(jmeta);
-
-      if(i == mi || j == mj) i = -1;
-      if(i == mi || j == mj) j = -1;
-    }
-
-    public int compare(final int mi, final int mj) {
-      final int kvi = offsetFor(mi);
-      final int kvj = offsetFor(mj);
-      final int kvip = kvmeta.get(kvi + PARTITION);
-      final int kvjp = kvmeta.get(kvj + PARTITION);
-      // sort by partition      
-      if (kvip != kvjp) {
-        return kvip - kvjp;
-      }
-      
-      final int istart = kvmeta.get(kvi + KEYSTART);
-      final int jstart = kvmeta.get(kvj + KEYSTART);
-      final int ilen   = kvmeta.get(kvi + VALSTART) - istart;
-      final int jlen   = kvmeta.get(kvj + VALSTART) - jstart;
-
-      kvbuffer.position(istart);
-      kvbuffer.get(ki, 0, ilen);
-      kvbuffer.position(jstart);
-      kvbuffer.get(kj, 0, jlen);
-      // sort by key
-      final int cmp = comparator.compare(ki, 0, ilen, kj, 0, jlen);
-      if(cmp == 0) eq++;
-      return cmp;
-    }
-
-    public SortSpan next() {
-      ByteBuffer remaining = end();
-      if(remaining != null) {
-        int items = length();
-        int perItem = kvbuffer.position()/items;
-        SortSpan newSpan = new SortSpan(remaining, items, perItem);
-        newSpan.index = index+1;
-        return newSpan;
-      }
-      return null;
-    }
-
-    public int length() {
-      return kvmeta.limit()/NMETA;
-    }
-
-    public ByteBuffer end() {
-      ByteBuffer remaining = kvbuffer.duplicate();
-      remaining.position(kvbuffer.position());
-      remaining = remaining.slice();
-      kvbuffer.limit(kvbuffer.position());
-      kvmeta.limit(kvmeta.position());
-      int items = length();
-      if(items == 0) {
-        return null;
-      }
-      int perItem = kvbuffer.position()/items;
-      LOG.info(String.format("Span%d.length = %d, perItem = %d", index, length(), perItem));
-      if(remaining.remaining() < NMETA+perItem) {
-        return null;
-      }
-      return remaining;
-    }
-
-    private int compareInternal(DataInputBuffer needle, int needlePart, int index) {
-      int cmp = 0;
-      int keystart;
-      int valstart;
-      int partition;
-      partition = kvmeta.get(span.offsetFor(index) + PARTITION);
-      if(partition != needlePart) {
-          cmp = (partition-needlePart);
-      } else {
-        keystart = kvmeta.get(span.offsetFor(index) + KEYSTART);
-        valstart = kvmeta.get(span.offsetFor(index) + VALSTART);
-        // hay is allocated ahead of time
-        hay.reset(kvbuffer, keystart, valstart - keystart);
-        cmp = comparator.compare(hay.getData(), 
-            hay.getPosition(), hay.getLength(),
-            needle.getData(), 
-            needle.getPosition(), needle.getLength());
-      }
-      return cmp;
-    }
-    
-    public long getEq() {
-      return eq;
-    }
-    
-    @Override
-    public String toString() {
-        return String.format("Span[%d,%d]", NMETA*kvmeta.capacity(), kvbuffer.limit());
-    }
-  }
-
-  private class SpanIterator implements PartitionedRawKeyValueIterator, Comparable<SpanIterator> {
-    private int kvindex = -1;
-    private int maxindex;
-    private IntBuffer kvmeta;
-    private ByteBuffer kvbuffer;
-    private SortSpan span;
-    private InputByteBuffer key = new InputByteBuffer();
-    private InputByteBuffer value = new InputByteBuffer();
-    private Progress progress = new Progress();
-
-    private final int minrun = (1 << 4);
-
-    public SpanIterator(SortSpan span) {
-      this.kvmeta = span.kvmeta;
-      this.kvbuffer = span.kvbuffer;
-      this.span = span;
-      this.maxindex = (kvmeta.limit()/NMETA) - 1;
-    }
-
-    public DataInputBuffer getKey() throws IOException {
-      final int keystart = kvmeta.get(span.offsetFor(kvindex) + KEYSTART);
-      final int valstart = kvmeta.get(span.offsetFor(kvindex) + VALSTART);
-      key.reset(kvbuffer, keystart, valstart - keystart);
-      return key;
-    }
-
-    public DataInputBuffer getValue() throws IOException {
-      final int valstart = kvmeta.get(span.offsetFor(kvindex) + VALSTART);
-      final int vallen = kvmeta.get(span.offsetFor(kvindex) + VALLEN);
-      value.reset(kvbuffer, valstart, vallen);
-      return value;
-    }
-
-    public boolean next() throws IOException {
-      // caveat: since we use this as a comparable in the merger 
-      if(kvindex == maxindex) return false;
-      if(kvindex % 100 == 0) {
-          progress.set((kvindex-maxindex) / maxindex);
-      }
-      kvindex += 1;
-      return true;
-    }
-
-    public void close() throws IOException {
-    }
-
-    public Progress getProgress() { 
-      return progress;
-    }
-
-    public int getPartition() {
-      final int partition = kvmeta.get(span.offsetFor(kvindex) + PARTITION);
-      return partition;
-    }
-
-    public int size() {
-      return (maxindex - kvindex);
-    }
-
-    public int compareTo(SpanIterator other) {
-      try {
-        return span.compareInternal(other.getKey(), other.getPartition(), kvindex);
-      } catch(IOException ie) {
-        // since we're not reading off disk, how could getKey() throw exceptions?
-      }
-      return -1;
-    }
-    
-    @Override
-    public String toString() {
-        return String.format("SpanIterator<%d:%d> (span=%s)", kvindex, maxindex, span.toString());
-    }
-
-    /**
-     * bisect returns the next insertion point for a given raw key, skipping keys
-     * which are <= needle using a binary search instead of a linear comparison.
-     * This is massively efficient when long strings of identical keys occur.
-     * @param needle 
-     * @param needlePart
-     * @return
-     */
-    int bisect(DataInputBuffer needle, int needlePart) {
-      int start = kvindex;
-      int end = maxindex-1;
-      int mid = start;
-      int cmp = 0;
-
-      if(end - start < minrun) {
-        return 0;
-      }
-
-      if(span.compareInternal(needle, needlePart, start) > 0) {
-        return kvindex;
-      }
-      
-      // bail out early if we haven't got a min run 
-      if(span.compareInternal(needle, needlePart, start+minrun) > 0) {
-        return 0;
-      }
-
-      if(span.compareInternal(needle, needlePart, end) < 0) {
-        return end - kvindex;
-      }
-      
-      boolean found = false;
-      
-      // we sort 100k items, the max it can do is 20 loops, but break early
-      for(int i = 0; start < end && i < 16; i++) {
-        mid = start + (end - start)/2;
-        cmp = span.compareInternal(needle, needlePart, mid);
-        if(cmp == 0) {
-          start = mid;
-          found = true;
-        } else if(cmp < 0) {
-          start = mid; 
-          found = true;
-        }
-        if(cmp > 0) {
-          end = mid;
-        }
-      }
-
-      if(found) {
-        return start - kvindex;
-      }
-      return 0;
-    }
-  }
-
-  private class SortTask implements Callable<SpanIterator> {
-    private final SortSpan sortable;
-    private final IndexedSorter sorter;
-    private final RawComparator comparator;
-    
-    public SortTask(SortSpan sortable, 
-              IndexedSorter sorter, RawComparator comparator) {
-        this.sortable = sortable;
-        this.sorter = sorter;
-        this.comparator = comparator;
-    }
-
-    public SpanIterator call() {
-      return sortable.sort(sorter, comparator);
-    }
-  }
-
-  private class PartitionFilter implements TezRawKeyValueIterator {
-    private final PartitionedRawKeyValueIterator iter;
-    private int partition;
-    private boolean dirty = false;
-    public PartitionFilter(PartitionedRawKeyValueIterator iter) {
-      this.iter = iter;
-    }
-    public DataInputBuffer getKey() throws IOException { return iter.getKey(); }
-    public DataInputBuffer getValue() throws IOException { return iter.getValue(); }
-    public void close() throws IOException { }
-    public Progress getProgress() {
-      return new Progress();
-    }
-    public boolean next() throws IOException {
-      if(dirty || iter.next()) { 
-        int prefix = iter.getPartition();
-
-        if((prefix >>> (32 - partitionBits)) == partition) {
-          dirty = false; // we found what we were looking for, good
-          return true;
-        } else if(!dirty) {
-          dirty = true; // we did a lookahead and failed to find partition
-        }
-      }
-      return false;
-    }
-
-    public void reset(int partition) {
-      this.partition = partition;
-    }
-
-    public int getPartition() {
-      return this.partition;
-    }
-  }
-
-  private class SpanHeap extends java.util.PriorityQueue<SpanIterator> {
-    public SpanHeap() {
-      super(256);
-    }
-    /**
-     * {@link PriorityQueue}.poll() by a different name 
-     * @return
-     */
-    public SpanIterator pop() {
-      return this.poll();
-    }
-  }
-
-  private class SpanMerger implements PartitionedRawKeyValueIterator {
-    private final RawComparator comparator;
-    InputByteBuffer key = new InputByteBuffer();
-    InputByteBuffer value = new InputByteBuffer();
-    int partition;
-
-    private ArrayList< Future<SpanIterator>> futures = new ArrayList< Future<SpanIterator>>();
-
-    private SpanHeap heap = new SpanHeap();
-    private PartitionFilter partIter;
-
-    private int gallop = 0;
-    private SpanIterator horse;
-    private long total = 0;
-    private long count = 0;
-    private long eq = 0;
-    
-    public SpanMerger(RawComparator comparator) {
-      this.comparator = comparator;
-      partIter = new PartitionFilter(this);
-    }
-
-    public void add(SpanIterator iter) throws IOException{
-      if(iter.next()) {
-        heap.add(iter);
-      }
-    }
-
-    public void add(Future<SpanIterator> iter) throws IOException{
-      this.futures.add(iter);
-    }
-
-    public boolean ready() throws IOException, InterruptedException {
-      try {
-        SpanIterator iter = null;
-        while(this.futures.size() > 0) {
-          Future<SpanIterator> futureIter = this.futures.remove(0);
-          iter = futureIter.get();
-          this.add(iter);
-        }
-        
-        StringBuilder sb = new StringBuilder();
-        for(SpanIterator sp: heap) {
-            sb.append(sp.toString());
-            sb.append(",");
-            total += sp.span.length();
-            eq += sp.span.getEq();
-        }
-        LOG.info("Heap = " + sb.toString());
-        return true;
-      } catch(Exception e) {
-        LOG.info(e.toString());
-        return false;
-      }
-    }
-
-    private SpanIterator pop() throws IOException {
-      if(gallop > 0) {
-        gallop--;
-        return horse;
-      }
-      SpanIterator current = heap.pop();
-      SpanIterator next = heap.peek();
-      if(next != null && current != null &&
-        ((Object)horse) == ((Object)current)) {
-        // TODO: a better threshold check
-        gallop = current.bisect(next.getKey(), next.getPartition())-1;
-      }
-      horse = current;
-      return current;
-    }
-    
-    public boolean needsRLE() {
-      return (eq > 0.1 * total);
-    }
-    
-    private SpanIterator peek() throws IOException {
-    	if(gallop > 0) {
-            return horse;
-        }
-    	return heap.peek();
-    }
-
-    public boolean next() throws IOException {
-      SpanIterator current = pop();
-
-      if(current != null) {
-        // keep local copies, since add() will move it all out
-        key.reset(current.getKey());
-        value.reset(current.getValue());
-        partition = current.getPartition();
-        if(gallop <= 0) {
-          this.add(current);
-        } else {
-          // galloping
-          current.next();
-        }
-        return true;
-      }
-      return false;
-    }
-
-    public DataInputBuffer getKey() throws IOException { return key; }
-    public DataInputBuffer getValue() throws IOException { return value; }
-    public int getPartition() { return partition; }
-
-    public void close() throws IOException {
-    }
-
-    public Progress getProgress() {
-      // TODO
-      return new Progress();
-    }
-
-    public TezRawKeyValueIterator filter(int partition) {
-      partIter.reset(partition);
-      return partIter;
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezIndexRecord.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezIndexRecord.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezIndexRecord.java
deleted file mode 100644
index ac0267c..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezIndexRecord.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.engine.common.sort.impl;
-
-public class TezIndexRecord {
-  private long startOffset;
-  private long rawLength;
-  private long partLength;
-
-  public TezIndexRecord() { }
-
-  public TezIndexRecord(long startOffset, long rawLength, long partLength) {
-    this.startOffset = startOffset;
-    this.rawLength = rawLength;
-    this.partLength = partLength;
-  }
-
-  public long getStartOffset() {
-    return startOffset;
-  }
-
-  public long getRawLength() {
-    return rawLength;
-  }
-
-  public long getPartLength() {
-    return partLength;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezMerger.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezMerger.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezMerger.java
deleted file mode 100644
index 7815569..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezMerger.java
+++ /dev/null
@@ -1,798 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.common.sort.impl;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.ChecksumFileSystem;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.util.PriorityQueue;
-import org.apache.hadoop.util.Progress;
-import org.apache.hadoop.util.Progressable;
-import org.apache.tez.common.Constants;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.engine.common.sort.impl.IFile.Reader;
-import org.apache.tez.engine.common.sort.impl.IFile.Writer;
-
-/**
- * Merger is an utility class used by the Map and Reduce tasks for merging
- * both their memory and disk segments
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-@SuppressWarnings({"unchecked", "rawtypes"})
-public class TezMerger {  
-  private static final Log LOG = LogFactory.getLog(TezMerger.class);
-
-  
-  // Local directories
-  private static LocalDirAllocator lDirAlloc = 
-    new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
-
-  public static
-  TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
-                            Class keyClass, Class valueClass, 
-                            CompressionCodec codec,
-                            Path[] inputs, boolean deleteInputs, 
-                            int mergeFactor, Path tmpDir,
-                            RawComparator comparator, Progressable reporter,
-                            TezCounter readsCounter,
-                            TezCounter writesCounter,
-                            Progress mergePhase)
-  throws IOException {
-    return 
-      new MergeQueue(conf, fs, inputs, deleteInputs, codec, comparator, 
-                           reporter, null).merge(keyClass, valueClass,
-                                           mergeFactor, tmpDir,
-                                           readsCounter, writesCounter, 
-                                           mergePhase);
-  }
-
-  public static 
-  TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
-                            Class keyClass, Class valueClass, 
-                            CompressionCodec codec,
-                            Path[] inputs, boolean deleteInputs, 
-                            int mergeFactor, Path tmpDir,
-                            RawComparator comparator,
-                            Progressable reporter,
-                            TezCounter readsCounter,
-                            TezCounter writesCounter,
-                            TezCounter mergedMapOutputsCounter,
-                            Progress mergePhase)
-  throws IOException {
-    return 
-      new MergeQueue(conf, fs, inputs, deleteInputs, codec, comparator, 
-                           reporter, mergedMapOutputsCounter).merge(
-                                           keyClass, valueClass,
-                                           mergeFactor, tmpDir,
-                                           readsCounter, writesCounter,
-                                           mergePhase);
-  }
-  
-  public static
-  TezRawKeyValueIterator merge(Configuration conf, FileSystem fs, 
-                            Class keyClass, Class valueClass, 
-                            List<Segment> segments, 
-                            int mergeFactor, Path tmpDir,
-                            RawComparator comparator, Progressable reporter,
-                            TezCounter readsCounter,
-                            TezCounter writesCounter,
-                            Progress mergePhase)
-      throws IOException {
-    return merge(conf, fs, keyClass, valueClass, segments, mergeFactor, tmpDir,
-                 comparator, reporter, false, readsCounter, writesCounter,
-                 mergePhase);
-  }
-
-  public static <K extends Object, V extends Object>
-  TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
-                            Class keyClass, Class valueClass,
-                            List<Segment> segments,
-                            int mergeFactor, Path tmpDir,
-                            RawComparator comparator, Progressable reporter,
-                            boolean sortSegments,
-                            TezCounter readsCounter,
-                            TezCounter writesCounter,
-                            Progress mergePhase)
-      throws IOException {
-    return new MergeQueue(conf, fs, segments, comparator, reporter,
-                           sortSegments).merge(keyClass, valueClass,
-                                               mergeFactor, tmpDir,
-                                               readsCounter, writesCounter,
-                                               mergePhase);
-  }
-
-  public static <K extends Object, V extends Object>
-  TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
-                            Class keyClass, Class valueClass,
-                            CompressionCodec codec,
-                            List<Segment> segments,
-                            int mergeFactor, Path tmpDir,
-                            RawComparator comparator, Progressable reporter,
-                            boolean sortSegments,
-                            TezCounter readsCounter,
-                            TezCounter writesCounter,
-                            Progress mergePhase)
-      throws IOException {
-    return new MergeQueue(conf, fs, segments, comparator, reporter,
-                           sortSegments, codec).merge(keyClass, valueClass,
-                                               mergeFactor, tmpDir,
-                                               readsCounter, writesCounter,
-                                               mergePhase);
-  }
-
-  public static <K extends Object, V extends Object>
-    TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
-                            Class keyClass, Class valueClass,
-                            List<Segment> segments,
-                            int mergeFactor, int inMemSegments, Path tmpDir,
-                            RawComparator comparator, Progressable reporter,
-                            boolean sortSegments,
-                            TezCounter readsCounter,
-                            TezCounter writesCounter,
-                            Progress mergePhase)
-      throws IOException {
-    return new MergeQueue(conf, fs, segments, comparator, reporter,
-                           sortSegments).merge(keyClass, valueClass,
-                                               mergeFactor, inMemSegments,
-                                               tmpDir,
-                                               readsCounter, writesCounter,
-                                               mergePhase);
-  }
-
-
-  static <K extends Object, V extends Object>
-  TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
-                          Class keyClass, Class valueClass,
-                          CompressionCodec codec,
-                          List<Segment> segments,
-                          int mergeFactor, int inMemSegments, Path tmpDir,
-                          RawComparator comparator, Progressable reporter,
-                          boolean sortSegments,
-                          TezCounter readsCounter,
-                          TezCounter writesCounter,
-                          Progress mergePhase)
-    throws IOException {
-  return new MergeQueue(conf, fs, segments, comparator, reporter,
-                         sortSegments, codec).merge(keyClass, valueClass,
-                                             mergeFactor, inMemSegments,
-                                             tmpDir,
-                                             readsCounter, writesCounter,
-                                             mergePhase);
-}
-
-  public static <K extends Object, V extends Object>
-  void writeFile(TezRawKeyValueIterator records, Writer writer, 
-                 Progressable progressable, Configuration conf) 
-  throws IOException {
-    long progressBar = 
-        conf.getLong(TezJobConfig.RECORDS_BEFORE_PROGRESS, 
-            TezJobConfig.DEFAULT_RECORDS_BEFORE_PROGRESS);
-    long recordCtr = 0;
-    while(records.next()) {
-      writer.append(records.getKey(), records.getValue());
-      
-      if (((recordCtr++) % progressBar) == 0) {
-        progressable.progress();
-      }
-    }
-}
-
-  @InterfaceAudience.Private
-  @InterfaceStability.Unstable
-  public static class Segment<K extends Object, V extends Object> {
-    Reader reader = null;
-    final DataInputBuffer key = new DataInputBuffer();
-    
-    Configuration conf = null;
-    FileSystem fs = null;
-    Path file = null;
-    boolean preserve = false;
-    CompressionCodec codec = null;
-    long segmentOffset = 0;
-    long segmentLength = -1;
-    
-    TezCounter mapOutputsCounter = null;
-
-    public Segment(Configuration conf, FileSystem fs, Path file,
-                   CompressionCodec codec, boolean preserve)
-    throws IOException {
-      this(conf, fs, file, codec, preserve, null);
-    }
-
-    public Segment(Configuration conf, FileSystem fs, Path file,
-                   CompressionCodec codec, boolean preserve,
-                   TezCounter mergedMapOutputsCounter)
-  throws IOException {
-      this(conf, fs, file, 0, fs.getFileStatus(file).getLen(), codec, preserve, 
-           mergedMapOutputsCounter);
-    }
-
-    public Segment(Configuration conf, FileSystem fs, Path file,
-                   long segmentOffset, long segmentLength,
-                   CompressionCodec codec,
-                   boolean preserve) throws IOException {
-      this(conf, fs, file, segmentOffset, segmentLength, codec, preserve, null);
-    }
-
-    public Segment(Configuration conf, FileSystem fs, Path file,
-        long segmentOffset, long segmentLength, CompressionCodec codec,
-        boolean preserve, TezCounter mergedMapOutputsCounter)
-    throws IOException {
-      this.conf = conf;
-      this.fs = fs;
-      this.file = file;
-      this.codec = codec;
-      this.preserve = preserve;
-
-      this.segmentOffset = segmentOffset;
-      this.segmentLength = segmentLength;
-      
-      this.mapOutputsCounter = mergedMapOutputsCounter;
-    }
-    
-    public Segment(Reader reader, boolean preserve) {
-      this(reader, preserve, null);
-    }
-    
-    public Segment(Reader reader, boolean preserve, 
-                   TezCounter mapOutputsCounter) {
-      this.reader = reader;
-      this.preserve = preserve;
-      
-      this.segmentLength = reader.getLength();
-      
-      this.mapOutputsCounter = mapOutputsCounter;
-    }
-
-    void init(TezCounter readsCounter) throws IOException {
-      if (reader == null) {
-        FSDataInputStream in = fs.open(file);
-        in.seek(segmentOffset);
-        reader = new Reader(conf, in, segmentLength, codec, readsCounter);
-      }
-      
-      if (mapOutputsCounter != null) {
-        mapOutputsCounter.increment(1);
-      }
-    }
-    
-    boolean inMemory() {
-      return fs == null;
-    }
-    
-    DataInputBuffer getKey() { return key; }
-
-    DataInputBuffer getValue(DataInputBuffer value) throws IOException {
-      nextRawValue(value);
-      return value;
-    }
-
-    public long getLength() { 
-      return (reader == null) ?
-        segmentLength : reader.getLength();
-    }
-    
-    boolean nextRawKey() throws IOException {
-      return reader.nextRawKey(key);
-    }
-
-    void nextRawValue(DataInputBuffer value) throws IOException {
-      reader.nextRawValue(value);
-    }
-
-    void closeReader() throws IOException {
-      if (reader != null) {
-        reader.close();
-        reader = null;
-      }
-    }
-    
-    void close() throws IOException {
-      closeReader();
-      if (!preserve && fs != null) {
-        fs.delete(file, false);
-      }
-    }
-
-    public long getPosition() throws IOException {
-      return reader.getPosition();
-    }
-
-    // This method is used by BackupStore to extract the 
-    // absolute position after a reset
-    long getActualPosition() throws IOException {
-      return segmentOffset + reader.getPosition();
-    }
-
-    Reader getReader() {
-      return reader;
-    }
-    
-    // This method is used by BackupStore to reinitialize the
-    // reader to start reading from a different segment offset
-    void reinitReader(int offset) throws IOException {
-      if (!inMemory()) {
-        closeReader();
-        segmentOffset = offset;
-        segmentLength = fs.getFileStatus(file).getLen() - segmentOffset;
-        init(null);
-      }
-    }
-  }
-  
-  // Boolean variable for including/considering final merge as part of sort
-  // phase or not. This is true in map task, false in reduce task. It is
-  // used in calculating mergeProgress.
-  static boolean includeFinalMerge = false;
-  
-  /**
-   * Sets the boolean variable includeFinalMerge to true. Called from
-   * map task before calling merge() so that final merge of map task
-   * is also considered as part of sort phase.
-   */
-  public static void considerFinalMergeForProgress() {
-    includeFinalMerge = true;
-  }
-  
-  private static class MergeQueue<K extends Object, V extends Object> 
-  extends PriorityQueue<Segment> implements TezRawKeyValueIterator {
-    Configuration conf;
-    FileSystem fs;
-    CompressionCodec codec;
-    
-    List<Segment> segments = new ArrayList<Segment>();
-    
-    RawComparator comparator;
-    
-    private long totalBytesProcessed;
-    private float progPerByte;
-    private Progress mergeProgress = new Progress();
-    
-    Progressable reporter;
-    
-    DataInputBuffer key;
-    final DataInputBuffer value = new DataInputBuffer();
-    final DataInputBuffer diskIFileValue = new DataInputBuffer();
-    
-    Segment minSegment;
-    Comparator<Segment> segmentComparator =   
-      new Comparator<Segment>() {
-      public int compare(Segment o1, Segment o2) {
-        if (o1.getLength() == o2.getLength()) {
-          return 0;
-        }
-
-        return o1.getLength() < o2.getLength() ? -1 : 1;
-      }
-    };
-
-    public MergeQueue(Configuration conf, FileSystem fs, 
-                      Path[] inputs, boolean deleteInputs, 
-                      CompressionCodec codec, RawComparator comparator,
-                      Progressable reporter, 
-                      TezCounter mergedMapOutputsCounter) 
-    throws IOException {
-      this.conf = conf;
-      this.fs = fs;
-      this.codec = codec;
-      this.comparator = comparator;
-      this.reporter = reporter;
-      
-      for (Path file : inputs) {
-        LOG.debug("MergeQ: adding: " + file);
-        segments.add(new Segment(conf, fs, file, codec, !deleteInputs, 
-                                       (file.toString().endsWith(
-                                           Constants.MERGED_OUTPUT_PREFIX) ? 
-                                        null : mergedMapOutputsCounter)));
-      }
-      
-      // Sort segments on file-lengths
-      Collections.sort(segments, segmentComparator); 
-    }
-    
-    public MergeQueue(Configuration conf, FileSystem fs, 
-        List<Segment> segments, RawComparator comparator,
-        Progressable reporter, boolean sortSegments) {
-      this.conf = conf;
-      this.fs = fs;
-      this.comparator = comparator;
-      this.segments = segments;
-      this.reporter = reporter;
-      if (sortSegments) {
-        Collections.sort(segments, segmentComparator);
-      }
-    }
-
-    public MergeQueue(Configuration conf, FileSystem fs,
-        List<Segment> segments, RawComparator comparator,
-        Progressable reporter, boolean sortSegments, CompressionCodec codec) {
-      this(conf, fs, segments, comparator, reporter, sortSegments);
-      this.codec = codec;
-    }
-
-    public void close() throws IOException {
-      Segment segment;
-      while((segment = pop()) != null) {
-        segment.close();
-      }
-    }
-
-    public DataInputBuffer getKey() throws IOException {
-      return key;
-    }
-
-    public DataInputBuffer getValue() throws IOException {
-      return value;
-    }
-
-    private void adjustPriorityQueue(Segment reader) throws IOException{
-      long startPos = reader.getPosition();
-      boolean hasNext = reader.nextRawKey();
-      long endPos = reader.getPosition();
-      totalBytesProcessed += endPos - startPos;
-      mergeProgress.set(totalBytesProcessed * progPerByte);
-      if (hasNext) {
-        adjustTop();
-      } else {
-        pop();
-        reader.close();
-      }
-    }
-
-    public boolean next() throws IOException {
-      if (size() == 0)
-        return false;
-
-      if (minSegment != null) {
-        //minSegment is non-null for all invocations of next except the first
-        //one. For the first invocation, the priority queue is ready for use
-        //but for the subsequent invocations, first adjust the queue 
-        adjustPriorityQueue(minSegment);
-        if (size() == 0) {
-          minSegment = null;
-          return false;
-        }
-      }
-      minSegment = top();
-      if (!minSegment.inMemory()) {
-        //When we load the value from an inmemory segment, we reset
-        //the "value" DIB in this class to the inmem segment's byte[].
-        //When we load the value bytes from disk, we shouldn't use
-        //the same byte[] since it would corrupt the data in the inmem
-        //segment. So we maintain an explicit DIB for value bytes
-        //obtained from disk, and if the current segment is a disk
-        //segment, we reset the "value" DIB to the byte[] in that (so 
-        //we reuse the disk segment DIB whenever we consider
-        //a disk segment).
-        value.reset(diskIFileValue.getData(), diskIFileValue.getLength());
-      }
-      long startPos = minSegment.getPosition();
-      key = minSegment.getKey();
-      minSegment.getValue(value);
-      long endPos = minSegment.getPosition();
-      totalBytesProcessed += endPos - startPos;
-      mergeProgress.set(totalBytesProcessed * progPerByte);
-      return true;
-    }
-
-    protected boolean lessThan(Object a, Object b) {
-      DataInputBuffer key1 = ((Segment)a).getKey();
-      DataInputBuffer key2 = ((Segment)b).getKey();
-      int s1 = key1.getPosition();
-      int l1 = key1.getLength() - s1;
-      int s2 = key2.getPosition();
-      int l2 = key2.getLength() - s2;
-
-      return comparator.compare(key1.getData(), s1, l1, key2.getData(), s2, l2) < 0;
-    }
-    
-    public TezRawKeyValueIterator merge(Class keyClass, Class valueClass,
-                                     int factor, Path tmpDir,
-                                     TezCounter readsCounter,
-                                     TezCounter writesCounter,
-                                     Progress mergePhase)
-        throws IOException {
-      return merge(keyClass, valueClass, factor, 0, tmpDir,
-                   readsCounter, writesCounter, mergePhase);
-    }
-
-    TezRawKeyValueIterator merge(Class keyClass, Class valueClass,
-                                     int factor, int inMem, Path tmpDir,
-                                     TezCounter readsCounter,
-                                     TezCounter writesCounter,
-                                     Progress mergePhase)
-        throws IOException {
-      LOG.info("Merging " + segments.size() + " sorted segments");
-
-      /*
-       * If there are inMemory segments, then they come first in the segments
-       * list and then the sorted disk segments. Otherwise(if there are only
-       * disk segments), then they are sorted segments if there are more than
-       * factor segments in the segments list.
-       */
-      int numSegments = segments.size();
-      int origFactor = factor;
-      int passNo = 1;
-      if (mergePhase != null) {
-        mergeProgress = mergePhase;
-      }
-
-      long totalBytes = computeBytesInMerges(factor, inMem);
-      if (totalBytes != 0) {
-        progPerByte = 1.0f / (float)totalBytes;
-      }
-      
-      //create the MergeStreams from the sorted map created in the constructor
-      //and dump the final output to a file
-      do {
-        //get the factor for this pass of merge. We assume in-memory segments
-        //are the first entries in the segment list and that the pass factor
-        //doesn't apply to them
-        factor = getPassFactor(factor, passNo, numSegments - inMem);
-        if (1 == passNo) {
-          factor += inMem;
-        }
-        List<Segment> segmentsToMerge =
-          new ArrayList<Segment>();
-        int segmentsConsidered = 0;
-        int numSegmentsToConsider = factor;
-        long startBytes = 0; // starting bytes of segments of this merge
-        while (true) {
-          //extract the smallest 'factor' number of segments  
-          //Call cleanup on the empty segments (no key/value data)
-          List<Segment> mStream = 
-            getSegmentDescriptors(numSegmentsToConsider);
-          for (Segment segment : mStream) {
-            // Initialize the segment at the last possible moment;
-            // this helps in ensuring we don't use buffers until we need them
-            segment.init(readsCounter);
-            long startPos = segment.getPosition();
-            boolean hasNext = segment.nextRawKey();
-            long endPos = segment.getPosition();
-            
-            if (hasNext) {
-              startBytes += endPos - startPos;
-              segmentsToMerge.add(segment);
-              segmentsConsidered++;
-            }
-            else {
-              segment.close();
-              numSegments--; //we ignore this segment for the merge
-            }
-          }
-          //if we have the desired number of segments
-          //or looked at all available segments, we break
-          if (segmentsConsidered == factor || 
-              segments.size() == 0) {
-            break;
-          }
-            
-          numSegmentsToConsider = factor - segmentsConsidered;
-        }
-        
-        //feed the streams to the priority queue
-        initialize(segmentsToMerge.size());
-        clear();
-        for (Segment segment : segmentsToMerge) {
-          put(segment);
-        }
-        
-        //if we have lesser number of segments remaining, then just return the
-        //iterator, else do another single level merge
-        if (numSegments <= factor) {
-          if (!includeFinalMerge) { // for reduce task
-
-            // Reset totalBytesProcessed and recalculate totalBytes from the
-            // remaining segments to track the progress of the final merge.
-            // Final merge is considered as the progress of the reducePhase,
-            // the 3rd phase of reduce task.
-            totalBytesProcessed = 0;
-            totalBytes = 0;
-            for (int i = 0; i < segmentsToMerge.size(); i++) {
-              totalBytes += segmentsToMerge.get(i).getLength();
-            }
-          }
-          if (totalBytes != 0) //being paranoid
-            progPerByte = 1.0f / (float)totalBytes;
-          
-          totalBytesProcessed += startBytes;         
-          if (totalBytes != 0)
-            mergeProgress.set(totalBytesProcessed * progPerByte);
-          else
-            mergeProgress.set(1.0f); // Last pass and no segments left - we're done
-          
-          LOG.info("Down to the last merge-pass, with " + numSegments + 
-                   " segments left of total size: " +
-                   (totalBytes - totalBytesProcessed) + " bytes");
-          return this;
-        } else {
-          LOG.info("Merging " + segmentsToMerge.size() + 
-                   " intermediate segments out of a total of " + 
-                   (segments.size()+segmentsToMerge.size()));
-          
-          long bytesProcessedInPrevMerges = totalBytesProcessed;
-          totalBytesProcessed += startBytes;
-
-          //we want to spread the creation of temp files on multiple disks if 
-          //available under the space constraints
-          long approxOutputSize = 0; 
-          for (Segment s : segmentsToMerge) {
-            approxOutputSize += s.getLength() + 
-                                ChecksumFileSystem.getApproxChkSumLength(
-                                s.getLength());
-          }
-          Path tmpFilename = 
-            new Path(tmpDir, "intermediate").suffix("." + passNo);
-
-          Path outputFile =  lDirAlloc.getLocalPathForWrite(
-                                              tmpFilename.toString(),
-                                              approxOutputSize, conf);
-
-          Writer writer = 
-            new Writer(conf, fs, outputFile, keyClass, valueClass, codec,
-                             writesCounter);
-          writeFile(this, writer, reporter, conf);
-          writer.close();
-          
-          //we finished one single level merge; now clean up the priority 
-          //queue
-          this.close();
-
-          // Add the newly create segment to the list of segments to be merged
-          Segment tempSegment = 
-            new Segment(conf, fs, outputFile, codec, false);
-
-          // Insert new merged segment into the sorted list
-          int pos = Collections.binarySearch(segments, tempSegment,
-                                             segmentComparator);
-          if (pos < 0) {
-            // binary search failed. So position to be inserted at is -pos-1
-            pos = -pos-1;
-          }
-          segments.add(pos, tempSegment);
-          numSegments = segments.size();
-          
-          // Subtract the difference between expected size of new segment and 
-          // actual size of new segment(Expected size of new segment is
-          // inputBytesOfThisMerge) from totalBytes. Expected size and actual
-          // size will match(almost) if combiner is not called in merge.
-          long inputBytesOfThisMerge = totalBytesProcessed -
-                                       bytesProcessedInPrevMerges;
-          totalBytes -= inputBytesOfThisMerge - tempSegment.getLength();
-          if (totalBytes != 0) {
-            progPerByte = 1.0f / (float)totalBytes;
-          }
-          
-          passNo++;
-        }
-        //we are worried about only the first pass merge factor. So reset the 
-        //factor to what it originally was
-        factor = origFactor;
-      } while(true);
-    }
-    
-    /**
-     * Determine the number of segments to merge in a given pass. Assuming more
-     * than factor segments, the first pass should attempt to bring the total
-     * number of segments - 1 to be divisible by the factor - 1 (each pass
-     * takes X segments and produces 1) to minimize the number of merges.
-     */
-    private int getPassFactor(int factor, int passNo, int numSegments) {
-      if (passNo > 1 || numSegments <= factor || factor == 1) 
-        return factor;
-      int mod = (numSegments - 1) % (factor - 1);
-      if (mod == 0)
-        return factor;
-      return mod + 1;
-    }
-    
-    /** Return (& remove) the requested number of segment descriptors from the
-     * sorted map.
-     */
-    private List<Segment> getSegmentDescriptors(int numDescriptors) {
-      if (numDescriptors > segments.size()) {
-        List<Segment> subList = new ArrayList<Segment>(segments);
-        segments.clear();
-        return subList;
-      }
-      
-      List<Segment> subList = 
-        new ArrayList<Segment>(segments.subList(0, numDescriptors));
-      for (int i=0; i < numDescriptors; ++i) {
-        segments.remove(0);
-      }
-      return subList;
-    }
-    
-    /**
-     * Compute expected size of input bytes to merges, will be used in
-     * calculating mergeProgress. This simulates the above merge() method and
-     * tries to obtain the number of bytes that are going to be merged in all
-     * merges(assuming that there is no combiner called while merging).
-     * @param factor mapreduce.task.io.sort.factor
-     * @param inMem  number of segments in memory to be merged
-     */
-    long computeBytesInMerges(int factor, int inMem) {
-      int numSegments = segments.size();
-      List<Long> segmentSizes = new ArrayList<Long>(numSegments);
-      long totalBytes = 0;
-      int n = numSegments - inMem;
-      // factor for 1st pass
-      int f = getPassFactor(factor, 1, n) + inMem;
-      n = numSegments;
- 
-      for (int i = 0; i < numSegments; i++) {
-        // Not handling empty segments here assuming that it would not affect
-        // much in calculation of mergeProgress.
-        segmentSizes.add(segments.get(i).getLength());
-      }
-      
-      // If includeFinalMerge is true, allow the following while loop iterate
-      // for 1 more iteration. This is to include final merge as part of the
-      // computation of expected input bytes of merges
-      boolean considerFinalMerge = includeFinalMerge;
-      
-      while (n > f || considerFinalMerge) {
-        if (n <=f ) {
-          considerFinalMerge = false;
-        }
-        long mergedSize = 0;
-        f = Math.min(f, segmentSizes.size());
-        for (int j = 0; j < f; j++) {
-          mergedSize += segmentSizes.remove(0);
-        }
-        totalBytes += mergedSize;
-        
-        // insert new size into the sorted list
-        int pos = Collections.binarySearch(segmentSizes, mergedSize);
-        if (pos < 0) {
-          pos = -pos-1;
-        }
-        segmentSizes.add(pos, mergedSize);
-        
-        n -= (f-1);
-        f = factor;
-      }
-
-      return totalBytes;
-    }
-
-    public Progress getProgress() {
-      return mergeProgress;
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezRawKeyValueIterator.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezRawKeyValueIterator.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezRawKeyValueIterator.java
deleted file mode 100644
index 39cffcb..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezRawKeyValueIterator.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.common.sort.impl;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.util.Progress;
-
-/**
- * <code>TezRawKeyValueIterator</code> is an iterator used to iterate over
- * the raw keys and values during sort/merge of intermediate data. 
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public interface TezRawKeyValueIterator {
-  /** 
-   * Gets the current raw key.
-   * 
-   * @return Gets the current raw key as a DataInputBuffer
-   * @throws IOException
-   */
-  DataInputBuffer getKey() throws IOException;
-  
-  /** 
-   * Gets the current raw value.
-   * 
-   * @return Gets the current raw value as a DataInputBuffer 
-   * @throws IOException
-   */
-  DataInputBuffer getValue() throws IOException;
-  
-  /** 
-   * Sets up the current key and value (for getKey and getValue).
-   * 
-   * @return <code>true</code> if there exists a key/value, 
-   *         <code>false</code> otherwise. 
-   * @throws IOException
-   */
-  boolean next() throws IOException;
-  
-  /** 
-   * Closes the iterator so that the underlying streams can be closed.
-   * 
-   * @throws IOException
-   */
-  void close() throws IOException;
-  
-  /** Gets the Progress object; this has a float (0.0 - 1.0) 
-   * indicating the bytes processed by the iterator so far
-   */
-  Progress getProgress();
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezSpillRecord.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezSpillRecord.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezSpillRecord.java
deleted file mode 100644
index 19fbd7f..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezSpillRecord.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.common.sort.impl;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.LongBuffer;
-import java.util.zip.CheckedInputStream;
-import java.util.zip.CheckedOutputStream;
-import java.util.zip.Checksum;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.ChecksumException;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.util.PureJavaCrc32;
-import org.apache.tez.common.Constants;
-
-public class TezSpillRecord {
-
-  /** Backing store */
-  private final ByteBuffer buf;
-  /** View of backing storage as longs */
-  private final LongBuffer entries;
-
-  public TezSpillRecord(int numPartitions) {
-    buf = ByteBuffer.allocate(
-        numPartitions * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH);
-    entries = buf.asLongBuffer();
-  }
-
-  public TezSpillRecord(Path indexFileName, Configuration job) throws IOException {
-    this(indexFileName, job, null);
-  }
-
-  public TezSpillRecord(Path indexFileName, Configuration job, String expectedIndexOwner)
-    throws IOException {
-    this(indexFileName, job, new PureJavaCrc32(), expectedIndexOwner);
-  }
-
-  public TezSpillRecord(Path indexFileName, Configuration job, Checksum crc,
-                     String expectedIndexOwner)
-      throws IOException {
-
-    final FileSystem rfs = FileSystem.getLocal(job).getRaw();
-    final FSDataInputStream in = rfs.open(indexFileName);
-    try {
-      final long length = rfs.getFileStatus(indexFileName).getLen();
-      final int partitions = 
-          (int) length / Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH;
-      final int size = partitions * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH;
-
-      buf = ByteBuffer.allocate(size);
-      if (crc != null) {
-        crc.reset();
-        CheckedInputStream chk = new CheckedInputStream(in, crc);
-        IOUtils.readFully(chk, buf.array(), 0, size);
-        if (chk.getChecksum().getValue() != in.readLong()) {
-          throw new ChecksumException("Checksum error reading spill index: " +
-                                indexFileName, -1);
-        }
-      } else {
-        IOUtils.readFully(in, buf.array(), 0, size);
-      }
-      entries = buf.asLongBuffer();
-    } finally {
-      in.close();
-    }
-  }
-
-  /**
-   * Return number of IndexRecord entries in this spill.
-   */
-  public int size() {
-    return entries.capacity() / (Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH / 8);
-  }
-
-  /**
-   * Get spill offsets for given partition.
-   */
-  public TezIndexRecord getIndex(int partition) {
-    final int pos = partition * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH / 8;
-    return new TezIndexRecord(entries.get(pos), entries.get(pos + 1),
-                           entries.get(pos + 2));
-  }
-
-  /**
-   * Set spill offsets for given partition.
-   */
-  public void putIndex(TezIndexRecord rec, int partition) {
-    final int pos = partition * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH / 8;
-    entries.put(pos, rec.getStartOffset());
-    entries.put(pos + 1, rec.getRawLength());
-    entries.put(pos + 2, rec.getPartLength());
-  }
-
-  /**
-   * Write this spill record to the location provided.
-   */
-  public void writeToFile(Path loc, Configuration job)
-      throws IOException {
-    writeToFile(loc, job, new PureJavaCrc32());
-  }
-
-  public void writeToFile(Path loc, Configuration job, Checksum crc)
-      throws IOException {
-    final FileSystem rfs = FileSystem.getLocal(job).getRaw();
-    CheckedOutputStream chk = null;
-    final FSDataOutputStream out = rfs.create(loc);
-    try {
-      if (crc != null) {
-        crc.reset();
-        chk = new CheckedOutputStream(out, crc);
-        chk.write(buf.array());
-        out.writeLong(chk.getChecksum().getValue());
-      } else {
-        out.write(buf.array());
-      }
-    } finally {
-      if (chk != null) {
-        chk.close();
-      } else {
-        out.close();
-      }
-    }
-  }
-
-}