You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/25 09:31:38 UTC

[31/50] [abbrv] Rename tez-engine-api to tez-runtime-api and tez-engine is split into 2: - tez-engine-library for user-visible Input/Output/Processor implementations - tez-engine-internals for framework internals

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
new file mode 100644
index 0000000..4ce82d5
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
@@ -0,0 +1,559 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.common.sort.impl;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.BufferUtils;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.tez.common.counters.TezCounter;
+
+/**
+ * <code>IFile</code> is the simple <key-len, value-len, key, value> format
+ * for the intermediate map-outputs in Map-Reduce.
+ *
+ * There is a <code>Writer</code> to write out map-outputs in this format and 
+ * a <code>Reader</code> to read files of this format.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class IFile {
+  private static final Log LOG = LogFactory.getLog(IFile.class);
+  public static final int EOF_MARKER = -1; // End of File Marker
+  public static final int RLE_MARKER = -2; // Repeat same key marker
+  public static final DataInputBuffer REPEAT_KEY = new DataInputBuffer();
+    
+  /**
+   * <code>IFile.Writer</code> to write out intermediate map-outputs. 
+   */
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  public static class Writer {
+    FSDataOutputStream out;
+    boolean ownOutputStream = false;
+    long start = 0;
+    FSDataOutputStream rawOut;
+    AtomicBoolean closed = new AtomicBoolean(false);
+    
+    CompressionOutputStream compressedOut;
+    Compressor compressor;
+    boolean compressOutput = false;
+    
+    long decompressedBytesWritten = 0;
+    long compressedBytesWritten = 0;
+
+    // Count records written to disk
+    private long numRecordsWritten = 0;
+    private final TezCounter writtenRecordsCounter;
+
+    IFileOutputStream checksumOut;
+
+    Class keyClass;
+    Class valueClass;
+    Serializer keySerializer;
+    Serializer valueSerializer;
+    
+    DataOutputBuffer buffer = new DataOutputBuffer();
+    DataOutputBuffer previous = new DataOutputBuffer();
+    
+    // de-dup keys or not
+    private boolean rle = false;
+
+    public Writer(Configuration conf, FileSystem fs, Path file, 
+                  Class keyClass, Class valueClass,
+                  CompressionCodec codec,
+                  TezCounter writesCounter) throws IOException {
+      this(conf, fs.create(file), keyClass, valueClass, codec,
+           writesCounter);
+      ownOutputStream = true;
+    }
+    
+    protected Writer(TezCounter writesCounter) {
+      writtenRecordsCounter = writesCounter;
+    }
+
+    public Writer(Configuration conf, FSDataOutputStream out, 
+        Class keyClass, Class valueClass,
+        CompressionCodec codec, TezCounter writesCounter)
+        throws IOException {
+      this.writtenRecordsCounter = writesCounter;
+      this.checksumOut = new IFileOutputStream(out);
+      this.rawOut = out;
+      this.start = this.rawOut.getPos();
+      if (codec != null) {
+        this.compressor = CodecPool.getCompressor(codec);
+        if (this.compressor != null) {
+          this.compressor.reset();
+          this.compressedOut = codec.createOutputStream(checksumOut, compressor);
+          this.out = new FSDataOutputStream(this.compressedOut,  null);
+          this.compressOutput = true;
+        } else {
+          LOG.warn("Could not obtain compressor from CodecPool");
+          this.out = new FSDataOutputStream(checksumOut,null);
+        }
+      } else {
+        this.out = new FSDataOutputStream(checksumOut,null);
+      }
+      
+      this.keyClass = keyClass;
+      this.valueClass = valueClass;
+
+      if (keyClass != null) {
+        SerializationFactory serializationFactory = 
+          new SerializationFactory(conf);
+        this.keySerializer = serializationFactory.getSerializer(keyClass);
+        this.keySerializer.open(buffer);
+        this.valueSerializer = serializationFactory.getSerializer(valueClass);
+        this.valueSerializer.open(buffer);
+      }
+    }
+
+    public Writer(Configuration conf, FileSystem fs, Path file) 
+    throws IOException {
+      this(conf, fs, file, null, null, null, null);
+    }
+
+    public void close() throws IOException {
+      if (closed.getAndSet(true)) {
+        throw new IOException("Writer was already closed earlier");
+      }
+
+      // When IFile writer is created by BackupStore, we do not have
+      // Key and Value classes set. So, check before closing the
+      // serializers
+      if (keyClass != null) {
+        keySerializer.close();
+        valueSerializer.close();
+      }
+
+      // Write EOF_MARKER for key/value length
+      WritableUtils.writeVInt(out, EOF_MARKER);
+      WritableUtils.writeVInt(out, EOF_MARKER);
+      decompressedBytesWritten += 2 * WritableUtils.getVIntSize(EOF_MARKER);
+      
+      //Flush the stream
+      out.flush();
+  
+      if (compressOutput) {
+        // Flush
+        compressedOut.finish();
+        compressedOut.resetState();
+      }
+      
+      // Close the underlying stream iff we own it...
+      if (ownOutputStream) {
+        out.close();
+      }
+      else {
+        // Write the checksum
+        checksumOut.finish();
+      }
+
+      compressedBytesWritten = rawOut.getPos() - start;
+
+      if (compressOutput) {
+        // Return back the compressor
+        CodecPool.returnCompressor(compressor);
+        compressor = null;
+      }
+
+      out = null;
+      if(writtenRecordsCounter != null) {
+        writtenRecordsCounter.increment(numRecordsWritten);
+      }
+    }
+
+    public void append(Object key, Object value) throws IOException {
+      if (key.getClass() != keyClass)
+        throw new IOException("wrong key class: "+ key.getClass()
+                              +" is not "+ keyClass);
+      if (value.getClass() != valueClass)
+        throw new IOException("wrong value class: "+ value.getClass()
+                              +" is not "+ valueClass);
+      
+      boolean sameKey = false;
+
+      // Append the 'key'
+      keySerializer.serialize(key);
+      int keyLength = buffer.getLength();
+      if (keyLength < 0) {
+        throw new IOException("Negative key-length not allowed: " + keyLength + 
+                              " for " + key);
+      }     
+      
+      if(keyLength == previous.getLength()) {
+        sameKey = (BufferUtils.compare(previous, buffer) == 0);       
+      }
+      
+      if(!sameKey) {
+        BufferUtils.copy(buffer, previous);
+      }
+
+      // Append the 'value'
+      valueSerializer.serialize(value);
+      int valueLength = buffer.getLength() - keyLength;
+      if (valueLength < 0) {
+        throw new IOException("Negative value-length not allowed: " + 
+                              valueLength + " for " + value);
+      }
+      
+      if(sameKey) {        
+        WritableUtils.writeVInt(out, RLE_MARKER);                   // Same key as previous
+        WritableUtils.writeVInt(out, valueLength);                  // value length
+        out.write(buffer.getData(), keyLength, buffer.getLength()); // only the value
+        // Update bytes written
+        decompressedBytesWritten += 0 + valueLength + 
+                                    WritableUtils.getVIntSize(RLE_MARKER) + 
+                                    WritableUtils.getVIntSize(valueLength);
+      } else {        
+        // Write the record out        
+        WritableUtils.writeVInt(out, keyLength);                  // key length
+        WritableUtils.writeVInt(out, valueLength);                // value length
+        out.write(buffer.getData(), 0, buffer.getLength());       // data
+        // Update bytes written
+        decompressedBytesWritten += keyLength + valueLength + 
+                                    WritableUtils.getVIntSize(keyLength) + 
+                                    WritableUtils.getVIntSize(valueLength);
+      }
+
+      // Reset
+      buffer.reset();
+      
+      
+      ++numRecordsWritten;
+    }
+    
+    public void append(DataInputBuffer key, DataInputBuffer value)
+    throws IOException {
+      int keyLength = key.getLength() - key.getPosition();
+      if (keyLength < 0) {
+        throw new IOException("Negative key-length not allowed: " + keyLength + 
+                              " for " + key);
+      }
+      
+      int valueLength = value.getLength() - value.getPosition();
+      if (valueLength < 0) {
+        throw new IOException("Negative value-length not allowed: " + 
+                              valueLength + " for " + value);
+      }
+      
+      boolean sameKey = false;
+      
+      if(rle && keyLength == previous.getLength()) {
+        sameKey = (keyLength != 0) && (BufferUtils.compare(previous, key) == 0);        
+      }
+      
+      if(rle && sameKey) {
+        WritableUtils.writeVInt(out, RLE_MARKER);
+        WritableUtils.writeVInt(out, valueLength);        
+        out.write(value.getData(), value.getPosition(), valueLength);
+
+        // Update bytes written
+        decompressedBytesWritten += 0 + valueLength
+            + WritableUtils.getVIntSize(RLE_MARKER)
+            + WritableUtils.getVIntSize(valueLength);
+      } else {
+        WritableUtils.writeVInt(out, keyLength);
+        WritableUtils.writeVInt(out, valueLength);
+        out.write(key.getData(), key.getPosition(), keyLength);
+        out.write(value.getData(), value.getPosition(), valueLength);
+
+        // Update bytes written
+        decompressedBytesWritten += keyLength + valueLength
+            + WritableUtils.getVIntSize(keyLength)
+            + WritableUtils.getVIntSize(valueLength);
+                
+        BufferUtils.copy(key, previous);        
+      }
+      ++numRecordsWritten;
+    }
+    
+    // Required for mark/reset
+    public DataOutputStream getOutputStream () {
+      return out;
+    }
+    
+    // Required for mark/reset
+    public void updateCountersForExternalAppend(long length) {
+      ++numRecordsWritten;
+      decompressedBytesWritten += length;
+    }
+    
+    public long getRawLength() {
+      return decompressedBytesWritten;
+    }
+    
+    public long getCompressedLength() {
+      return compressedBytesWritten;
+    }
+    
+    public void setRLE(boolean rle) {
+      this.rle = rle;
+      previous.reset();
+    }
+
+  }
+
+  /**
+   * <code>IFile.Reader</code> to read intermediate map-outputs. 
+   */
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  public static class Reader {
+    
+    public enum KeyState {NO_KEY, NEW_KEY, SAME_KEY};
+    
+    private static final int DEFAULT_BUFFER_SIZE = 128*1024;
+
+    // Count records read from disk
+    private long numRecordsRead = 0;
+    private final TezCounter readRecordsCounter;
+
+    final InputStream in;        // Possibly decompressed stream that we read
+    Decompressor decompressor;
+    public long bytesRead = 0;
+    protected final long fileLength;
+    protected boolean eof = false;
+    final IFileInputStream checksumIn;
+    
+    protected byte[] buffer = null;
+    protected int bufferSize = DEFAULT_BUFFER_SIZE;
+    protected DataInputStream dataIn;
+
+    protected int recNo = 1;
+    protected int prevKeyLength;
+    protected int currentKeyLength;
+    protected int currentValueLength;
+    byte keyBytes[] = new byte[0];
+    
+    
+    /**
+     * Construct an IFile Reader.
+     * 
+     * @param conf Configuration File 
+     * @param fs  FileSystem
+     * @param file Path of the file to be opened. This file should have
+     *             checksum bytes for the data at the end of the file.
+     * @param codec codec
+     * @param readsCounter Counter for records read from disk
+     * @throws IOException
+     */
+    public Reader(Configuration conf, FileSystem fs, Path file,
+                  CompressionCodec codec,
+                  TezCounter readsCounter) throws IOException {
+      this(conf, fs.open(file), 
+           fs.getFileStatus(file).getLen(),
+           codec, readsCounter);
+    }
+
+    /**
+     * Construct an IFile Reader.
+     * 
+     * @param conf Configuration File 
+     * @param in   The input stream
+     * @param length Length of the data in the stream, including the checksum
+     *               bytes.
+     * @param codec codec
+     * @param readsCounter Counter for records read from disk
+     * @throws IOException
+     */
+    public Reader(Configuration conf, InputStream in, long length, 
+                  CompressionCodec codec,
+                  TezCounter readsCounter) throws IOException {
+      readRecordsCounter = readsCounter;
+      checksumIn = new IFileInputStream(in,length, conf);
+      if (codec != null) {
+        decompressor = CodecPool.getDecompressor(codec);
+        if (decompressor != null) {
+          this.in = codec.createInputStream(checksumIn, decompressor);
+        } else {
+          LOG.warn("Could not obtain decompressor from CodecPool");
+          this.in = checksumIn;
+        }
+      } else {
+        this.in = checksumIn;
+      }
+      this.dataIn = new DataInputStream(this.in);
+      this.fileLength = length;
+      
+      if (conf != null) {
+        bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
+      }
+    }
+    
+    public long getLength() { 
+      return fileLength - checksumIn.getSize();
+    }
+    
+    public long getPosition() throws IOException {    
+      return checksumIn.getPosition(); 
+    }
+    
+    /**
+     * Read upto len bytes into buf starting at offset off.
+     * 
+     * @param buf buffer 
+     * @param off offset
+     * @param len length of buffer
+     * @return the no. of bytes read
+     * @throws IOException
+     */
+    private int readData(byte[] buf, int off, int len) throws IOException {
+      int bytesRead = 0;
+      while (bytesRead < len) {
+        int n = IOUtils.wrappedReadForCompressedData(in, buf, off + bytesRead,
+            len - bytesRead);
+        if (n < 0) {
+          return bytesRead;
+        }
+        bytesRead += n;
+      }
+      return len;
+    }
+    
+    protected boolean positionToNextRecord(DataInput dIn) throws IOException {
+      // Sanity check
+      if (eof) {
+        throw new EOFException("Completed reading " + bytesRead);
+      }
+      
+      // Read key and value lengths
+      prevKeyLength = currentKeyLength;
+      currentKeyLength = WritableUtils.readVInt(dIn);
+      currentValueLength = WritableUtils.readVInt(dIn);
+      bytesRead += WritableUtils.getVIntSize(currentKeyLength) +
+                   WritableUtils.getVIntSize(currentValueLength);
+      
+      // Check for EOF
+      if (currentKeyLength == EOF_MARKER && currentValueLength == EOF_MARKER) {
+        eof = true;
+        return false;
+      }      
+      
+      // Sanity check
+      if (currentKeyLength != RLE_MARKER && currentKeyLength < 0) {
+        throw new IOException("Rec# " + recNo + ": Negative key-length: " + 
+                              currentKeyLength);
+      }
+      if (currentValueLength < 0) {
+        throw new IOException("Rec# " + recNo + ": Negative value-length: " + 
+                              currentValueLength);
+      }
+            
+      return true;
+    }
+    
+    public boolean nextRawKey(DataInputBuffer key) throws IOException {
+      return readRawKey(key) != KeyState.NO_KEY;
+    }
+    
+    public KeyState readRawKey(DataInputBuffer key) throws IOException {
+      if (!positionToNextRecord(dataIn)) {
+        return KeyState.NO_KEY;
+      }
+      if(currentKeyLength == RLE_MARKER) {
+        currentKeyLength = prevKeyLength;
+        // no data to read
+        key.reset(keyBytes, currentKeyLength);
+        return KeyState.SAME_KEY;
+      }
+      if (keyBytes.length < currentKeyLength) {
+        keyBytes = new byte[currentKeyLength << 1];
+      }
+      int i = readData(keyBytes, 0, currentKeyLength);
+      if (i != currentKeyLength) {
+        throw new IOException ("Asked for " + currentKeyLength + " Got: " + i);
+      }
+      key.reset(keyBytes, currentKeyLength);
+      bytesRead += currentKeyLength;
+      return KeyState.NEW_KEY;
+    }
+    
+    public void nextRawValue(DataInputBuffer value) throws IOException {
+      final byte[] valBytes = 
+        ((value.getData().length < currentValueLength) || (value.getData() == keyBytes))
+        ? new byte[currentValueLength << 1]
+        : value.getData();
+      int i = readData(valBytes, 0, currentValueLength);
+      if (i != currentValueLength) {
+        throw new IOException ("Asked for " + currentValueLength + " Got: " + i);
+      }
+      value.reset(valBytes, currentValueLength);
+      
+      // Record the bytes read
+      bytesRead += currentValueLength;
+
+      ++recNo;
+      ++numRecordsRead;
+    }
+    
+    public void close() throws IOException {
+      // Close the underlying stream
+      in.close();
+      
+      // Release the buffer
+      dataIn = null;
+      buffer = null;
+      if(readRecordsCounter != null) {
+        readRecordsCounter.increment(numRecordsRead);
+      }
+
+      // Return the decompressor
+      if (decompressor != null) {
+        decompressor.reset();
+        CodecPool.returnDecompressor(decompressor);
+        decompressor = null;
+      }
+    }
+    
+    public void reset(int offset) {
+      return;
+    }
+
+    public void disableChecksumValidation() {
+      checksumIn.disableChecksumValidation();
+    }
+
+  }    
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java
new file mode 100644
index 0000000..e828c0b
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java
@@ -0,0 +1,276 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.sort.impl;
+
+import java.io.EOFException;
+import java.io.FileDescriptor;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.HasFileDescriptor;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.tez.common.TezJobConfig;
+/**
+ * A checksum input stream, used for IFiles.
+ * Used to validate the checksum of files created by {@link IFileOutputStream}. 
+*/
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class IFileInputStream extends InputStream {
+  
+  private final InputStream in; //The input stream to be verified for checksum.
+  private final FileDescriptor inFd; // the file descriptor, if it is known
+  private final long length; //The total length of the input file
+  private final long dataLength;
+  private DataChecksum sum;
+  private long currentOffset = 0;
+  private final byte b[] = new byte[1];
+  private byte csum[] = null;
+  private int checksumSize;
+  private byte[] buffer;
+  private int offset;
+
+  private ReadaheadRequest curReadahead = null;
+  private ReadaheadPool raPool = ReadaheadPool.getInstance();
+  private boolean readahead;
+  private int readaheadLength;
+
+  public static final Log LOG = LogFactory.getLog(IFileInputStream.class);
+
+  private boolean disableChecksumValidation = false;
+  
+  /**
+   * Create a checksum input stream that reads
+   * @param in The input stream to be verified for checksum.
+   * @param len The length of the input stream including checksum bytes.
+   */
+  public IFileInputStream(InputStream in, long len, Configuration conf) {
+    this.in = in;
+    this.inFd = getFileDescriptorIfAvail(in);
+    sum = DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 
+        Integer.MAX_VALUE);
+    checksumSize = sum.getChecksumSize();
+    buffer = new byte[4096];
+    offset = 0;
+    length = len;
+    dataLength = length - checksumSize;
+
+    conf = (conf != null) ? conf : new Configuration();
+    readahead = conf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
+        TezJobConfig.DEFAULT_TEZ_RUNTIME_IFILE_READAHEAD);
+    readaheadLength = conf.getInt(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES,
+        TezJobConfig.DEFAULT_TEZ_RUNTIME_IFILE_READAHEAD_BYTES);
+
+    doReadahead();
+  }
+
+  private static FileDescriptor getFileDescriptorIfAvail(InputStream in) {
+    FileDescriptor fd = null;
+    try {
+      if (in instanceof HasFileDescriptor) {
+        fd = ((HasFileDescriptor)in).getFileDescriptor();
+      } else if (in instanceof FileInputStream) {
+        fd = ((FileInputStream)in).getFD();
+      }
+    } catch (IOException e) {
+      LOG.info("Unable to determine FileDescriptor", e);
+    }
+    return fd;
+  }
+
+  /**
+   * Close the input stream. Note that we need to read to the end of the
+   * stream to validate the checksum.
+   */
+  @Override
+  public void close() throws IOException {
+
+    if (curReadahead != null) {
+      curReadahead.cancel();
+    }
+    if (currentOffset < dataLength) {
+      byte[] t = new byte[Math.min((int)
+            (Integer.MAX_VALUE & (dataLength - currentOffset)), 32 * 1024)];
+      while (currentOffset < dataLength) {
+        int n = read(t, 0, t.length);
+        if (0 == n) {
+          throw new EOFException("Could not validate checksum");
+        }
+      }
+    }
+    in.close();
+  }
+  
+  @Override
+  public long skip(long n) throws IOException {
+   throw new IOException("Skip not supported for IFileInputStream");
+  }
+  
+  public long getPosition() {
+    return (currentOffset >= dataLength) ? dataLength : currentOffset;
+  }
+  
+  public long getSize() {
+    return checksumSize;
+  }
+
+  private void checksum(byte[] b, int off, int len) {
+    if(len >= buffer.length) {
+      sum.update(buffer, 0, offset);
+      offset = 0;
+      sum.update(b, off, len);
+      return;
+    }
+    final int remaining = buffer.length - offset;
+    if(len > remaining) {
+      sum.update(buffer, 0, offset);
+      offset = 0;
+    }
+    /* now we should have len < buffer.length */
+    System.arraycopy(b, off, buffer, offset, len);
+    offset += len;
+  }
+  
+  /**
+   * Read bytes from the stream.
+   * At EOF, checksum is validated, but the checksum
+   * bytes are not passed back in the buffer. 
+   */
+  public int read(byte[] b, int off, int len) throws IOException {
+
+    if (currentOffset >= dataLength) {
+      return -1;
+    }
+
+    doReadahead();
+
+    return doRead(b,off,len);
+  }
+
+  private void doReadahead() {
+    if (raPool != null && inFd != null && readahead) {
+      curReadahead = raPool.readaheadStream(
+          "ifile", inFd,
+          currentOffset, readaheadLength, dataLength,
+          curReadahead);
+    }
+  }
+
+  /**
+   * Read bytes from the stream.
+   * At EOF, checksum is validated and sent back
+   * as the last four bytes of the buffer. The caller should handle
+   * these bytes appropriately
+   */
+  public int readWithChecksum(byte[] b, int off, int len) throws IOException {
+
+    if (currentOffset == length) {
+      return -1;
+    }
+    else if (currentOffset >= dataLength) {
+      // If the previous read drained off all the data, then just return
+      // the checksum now. Note that checksum validation would have 
+      // happened in the earlier read
+      int lenToCopy = (int) (checksumSize - (currentOffset - dataLength));
+      if (len < lenToCopy) {
+        lenToCopy = len;
+      }
+      System.arraycopy(csum, (int) (currentOffset - dataLength), b, off, 
+          lenToCopy);
+      currentOffset += lenToCopy;
+      return lenToCopy;
+    }
+
+    int bytesRead = doRead(b,off,len);
+
+    if (currentOffset == dataLength) {
+      if (len >= bytesRead + checksumSize) {
+        System.arraycopy(csum, 0, b, off + bytesRead, checksumSize);
+        bytesRead += checksumSize;
+        currentOffset += checksumSize;
+      }
+    }
+    return bytesRead;
+  }
+
+  private int doRead(byte[]b, int off, int len) throws IOException {
+    
+    // If we are trying to read past the end of data, just read
+    // the left over data
+    if (currentOffset + len > dataLength) {
+      len = (int) dataLength - (int)currentOffset;
+    }
+    
+    int bytesRead = in.read(b, off, len);
+
+    if (bytesRead < 0) {
+      throw new ChecksumException("Checksum Error", 0);
+    }
+
+    checksum(b, off, bytesRead);
+
+    currentOffset += bytesRead;
+
+    if (disableChecksumValidation) {
+      return bytesRead;
+    }
+    
+    if (currentOffset == dataLength) {
+      // The last four bytes are checksum. Strip them and verify
+      sum.update(buffer, 0, offset);
+      csum = new byte[checksumSize];
+      IOUtils.readFully(in, csum, 0, checksumSize);
+      if (!sum.compare(csum, 0)) {
+        throw new ChecksumException("Checksum Error", 0);
+      }
+    }
+    return bytesRead;
+  }
+
+
+  @Override
+  public int read() throws IOException {    
+    b[0] = 0;
+    int l = read(b,0,1);
+    if (l < 0)  return l;
+    
+    // Upgrade the b[0] to an int so as not to misinterpret the
+    // first bit of the byte as a sign bit
+    int result = 0xFF & b[0];
+    return result;
+  }
+
+  public byte[] getChecksum() {
+    return csum;
+  }
+
+  void disableChecksumValidation() {
+    disableChecksumValidation = true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileOutputStream.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileOutputStream.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileOutputStream.java
new file mode 100644
index 0000000..3198446
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileOutputStream.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.sort.impl;
+
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.util.DataChecksum;
+/**
+ * A Checksum output stream.
+ * Checksum for the contents of the file is calculated and
+ * appended to the end of the file on close of the stream.
+ * Used for IFiles
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class IFileOutputStream extends FilterOutputStream {
+
+  /**
+   * The output stream to be checksummed.
+   */
+  private final DataChecksum sum;
+  private byte[] barray;
+  private byte[] buffer;
+  private int offset;
+  private boolean closed = false;
+  private boolean finished = false;
+
+  /**
+   * Create a checksum output stream that writes
+   * the bytes to the given stream.
+   * @param out
+   */
+  public IFileOutputStream(OutputStream out) {
+    super(out);
+    sum = DataChecksum.newDataChecksum(DataChecksum.Type.CRC32,
+        Integer.MAX_VALUE);
+    barray = new byte[sum.getChecksumSize()];
+    buffer = new byte[4096];
+    offset = 0;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (closed) {
+      return;
+    }
+    closed = true;
+    finish();
+    out.close();
+  }
+
+  /**
+   * Finishes writing data to the output stream, by writing
+   * the checksum bytes to the end. The underlying stream is not closed.
+   * @throws IOException
+   */
+  public void finish() throws IOException {
+    if (finished) {
+      return;
+    }
+    finished = true;
+    sum.update(buffer, 0, offset);
+    sum.writeValue(barray, 0, false);
+    out.write (barray, 0, sum.getChecksumSize());
+    out.flush();
+  }
+
+  private void checksum(byte[] b, int off, int len) {
+    if(len >= buffer.length) {
+      sum.update(buffer, 0, offset);
+      offset = 0;
+      sum.update(b, off, len);
+      return;
+    }
+    final int remaining = buffer.length - offset;
+    if(len > remaining) {
+      sum.update(buffer, 0, offset);
+      offset = 0;
+    }
+    /*
+    // FIXME if needed re-enable this in debug mode
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("XXX checksum" +
+          " b=" + b + " off=" + off +
+          " buffer=" + " offset=" + offset +
+          " len=" + len);
+    }
+    */
+    /* now we should have len < buffer.length */
+    System.arraycopy(b, off, buffer, offset, len);
+    offset += len;
+  }
+
+  /**
+   * Write bytes to the stream.
+   */
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    checksum(b, off, len);
+    out.write(b,off,len);
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    barray[0] = (byte) (b & 0xFF);
+    write(barray,0,1);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
new file mode 100644
index 0000000..1b153ca
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -0,0 +1,932 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.tez.runtime.library.common.sort.impl;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.IntBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.HashComparator;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.util.IndexedSortable;
+import org.apache.hadoop.util.IndexedSorter;
+import org.apache.hadoop.util.Progress;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.runtime.api.TezOutputContext;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
+import org.apache.tez.runtime.library.common.sort.impl.TezMerger.Segment;
+
+@SuppressWarnings({"unchecked", "rawtypes"})
+public class PipelinedSorter extends ExternalSorter {
+  
+  private static final Log LOG = LogFactory.getLog(PipelinedSorter.class);
+  
+  /**
+   * The size of each record in the index file for the map-outputs.
+   */
+  public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
+
+  private final static int APPROX_HEADER_LENGTH = 150;
+    
+  int partitionBits;
+  
+  private static final int PARTITION = 0;        // partition offset in acct
+  private static final int KEYSTART = 1;         // key offset in acct
+  private static final int VALSTART = 2;         // val offset in acct
+  private static final int VALLEN = 3;           // val len in acct
+  private static final int NMETA = 4;            // num meta ints
+  private static final int METASIZE = NMETA * 4; // size in bytes
+
+  // spill accounting
+  volatile Throwable sortSpillException = null;
+
+  int numSpills = 0;
+  int minSpillsForCombine;
+  private HashComparator hasher;
+  // SortSpans  
+  private SortSpan span;
+  private ByteBuffer largeBuffer;
+  // Merger
+  private SpanMerger merger; 
+  private ExecutorService sortmaster;
+
+  final ArrayList<TezSpillRecord> indexCacheList =
+    new ArrayList<TezSpillRecord>();
+  private int totalIndexCacheMemory;
+  private int indexCacheMemoryLimit;
+
+  public void initialize(TezOutputContext outputContext, Configuration conf, int numOutputs) throws IOException {
+    super.initialize(outputContext, conf, numOutputs);
+    
+    partitionBits = bitcount(partitions)+1;
+   
+    //sanity checks
+    final float spillper =
+      this.conf.getFloat(
+          TezJobConfig.TEZ_RUNTIME_SORT_SPILL_PERCENT, 
+          TezJobConfig.DEFAULT_TEZ_RUNTIME_SORT_SPILL_PERCENT);
+    final int sortmb = 
+        this.conf.getInt(
+            TezJobConfig.TEZ_RUNTIME_IO_SORT_MB, 
+            TezJobConfig.DEFAULT_TEZ_RUNTIME_IO_SORT_MB);
+    indexCacheMemoryLimit = this.conf.getInt(TezJobConfig.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES,
+                                       TezJobConfig.DEFAULT_TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES);
+    if (spillper > (float)1.0 || spillper <= (float)0.0) {
+      throw new IOException("Invalid \"" + TezJobConfig.TEZ_RUNTIME_SORT_SPILL_PERCENT +
+          "\": " + spillper);
+    }
+    if ((sortmb & 0x7FF) != sortmb) {
+      throw new IOException(
+          "Invalid \"" + TezJobConfig.TEZ_RUNTIME_IO_SORT_MB + "\": " + sortmb);
+    }
+    
+    // buffers and accounting
+    int maxMemUsage = sortmb << 20;
+    maxMemUsage -= maxMemUsage % METASIZE;
+    largeBuffer = ByteBuffer.allocate(maxMemUsage);
+    LOG.info(TezJobConfig.TEZ_RUNTIME_IO_SORT_MB + " = " + sortmb);
+    // TODO: configurable setting?
+    span = new SortSpan(largeBuffer, 1024*1024, 16);
+    merger = new SpanMerger(comparator);
+    final int sortThreads = 
+            this.conf.getInt(
+                TezJobConfig.TEZ_RUNTIME_SORT_THREADS, 
+                TezJobConfig.DEFAULT_TEZ_RUNTIME_SORT_THREADS);
+    sortmaster = Executors.newFixedThreadPool(sortThreads);
+
+    // k/v serialization    
+    if(comparator instanceof HashComparator) {
+      hasher = (HashComparator)comparator;
+      LOG.info("Using the HashComparator");
+    } else {
+      hasher = null;
+    }    
+    valSerializer.open(span.out);
+    keySerializer.open(span.out);
+    minSpillsForCombine = this.conf.getInt(TezJobConfig.TEZ_RUNTIME_COMBINE_MIN_SPILLS, 3);
+  }
+
+  private int bitcount(int n) {
+    int bit = 0;
+    while(n!=0) {
+      bit++;
+      n >>= 1;
+    }
+    return bit;
+  }
+  
+  public void sort() throws IOException {
+    SortSpan newSpan = span.next();
+
+    if(newSpan == null) {
+      // sort in the same thread, do not wait for the thread pool
+      merger.add(span.sort(sorter, comparator));
+      spill();
+      int items = 1024*1024;
+      int perItem = 16;
+      if(span.length() != 0) {
+        items = span.length();
+        perItem = span.kvbuffer.limit()/items;
+        items = (largeBuffer.capacity())/(METASIZE+perItem);
+        if(items > 1024*1024) {
+            // our goal is to have 1M splits and sort early
+            items = 1024*1024;
+        }
+      }      
+      span = new SortSpan(largeBuffer, items, perItem);
+    } else {
+      // queue up the sort
+      SortTask task = new SortTask(span, sorter, comparator);
+      Future<SpanIterator> future = sortmaster.submit(task);
+      merger.add(future);
+      span = newSpan;
+    }
+    valSerializer.open(span.out);
+    keySerializer.open(span.out);
+  }
+
+  @Override
+  public void write(Object key, Object value) 
+      throws IOException {
+    collect(
+        key, value, partitioner.getPartition(key, value, partitions));
+  }
+
+  /**
+   * Serialize the key, value to intermediate storage.
+   * When this method returns, kvindex must refer to sufficient unused
+   * storage to store one METADATA.
+   */
+  synchronized void collect(Object key, Object value, final int partition
+                                   ) throws IOException {
+    if (key.getClass() != keyClass) {
+      throw new IOException("Type mismatch in key from map: expected "
+                            + keyClass.getName() + ", received "
+                            + key.getClass().getName());
+    }
+    if (value.getClass() != valClass) {
+      throw new IOException("Type mismatch in value from map: expected "
+                            + valClass.getName() + ", received "
+                            + value.getClass().getName());
+    }
+    if (partition < 0 || partition >= partitions) {
+      throw new IOException("Illegal partition for " + key + " (" +
+          partition + ")");
+    }
+    if(span.kvmeta.remaining() < METASIZE) {
+      this.sort();
+    }
+    int keystart = span.kvbuffer.position();
+    int valstart = -1;
+    int valend = -1;
+    try { 
+      keySerializer.serialize(key);
+      valstart = span.kvbuffer.position();      
+      valSerializer.serialize(value);
+      valend = span.kvbuffer.position();
+    } catch(BufferOverflowException overflow) {
+      // restore limit
+      span.kvbuffer.position(keystart);
+      this.sort();
+      // try again
+      this.collect(key, value, partition);
+      return;
+    }
+
+    int prefix = 0;
+
+    if(hasher != null) {
+      prefix = hasher.getHashCode(key);
+    }
+
+    prefix = (partition << (32 - partitionBits)) | (prefix >>> partitionBits);
+
+    /* maintain order as in PARTITION, KEYSTART, VALSTART, VALLEN */
+    span.kvmeta.put(prefix);
+    span.kvmeta.put(keystart);
+    span.kvmeta.put(valstart);
+    span.kvmeta.put(valend - valstart);
+    if((valstart - keystart) > span.keymax) {
+      span.keymax = (valstart - keystart);
+    }
+    if((valend - valstart) > span.valmax) {
+      span.valmax = (valend - valstart);
+    }
+    mapOutputRecordCounter.increment(1);
+    mapOutputByteCounter.increment(valend - keystart);
+  }
+
+  public void spill() throws IOException { 
+    // create spill file
+    final long size = largeBuffer.capacity() + 
+      (partitions * APPROX_HEADER_LENGTH);
+    final TezSpillRecord spillRec = new TezSpillRecord(partitions);
+    final Path filename =
+      mapOutputFile.getSpillFileForWrite(numSpills, size);    
+    FSDataOutputStream out = rfs.create(filename, true, 4096);
+
+    try {
+      merger.ready(); // wait for all the future results from sort threads
+      LOG.info("Spilling to " + filename.toString());
+      for (int i = 0; i < partitions; ++i) {
+        TezRawKeyValueIterator kvIter = merger.filter(i);
+        //write merged output to disk
+        long segmentStart = out.getPos();
+        Writer writer =
+          new Writer(conf, out, keyClass, valClass, codec,
+              spilledRecordsCounter);
+        writer.setRLE(merger.needsRLE());
+        if (combiner == null) {
+          while(kvIter.next()) {
+            writer.append(kvIter.getKey(), kvIter.getValue());
+          }
+        } else {          
+          runCombineProcessor(kvIter, writer);
+        }
+        //close
+        writer.close();
+
+        // record offsets
+        final TezIndexRecord rec = 
+            new TezIndexRecord(
+                segmentStart, 
+                writer.getRawLength(), 
+                writer.getCompressedLength());
+        spillRec.putIndex(rec, i);
+      }
+
+      Path indexFilename =
+        mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
+            * MAP_OUTPUT_INDEX_RECORD_LENGTH);
+      // TODO: cache
+      spillRec.writeToFile(indexFilename, conf);
+      ++numSpills;
+    } catch(InterruptedException ie) {
+      // TODO:the combiner has been interrupted
+    } finally {
+      out.close();
+    }
+  }
+
+  @Override
+  public void flush() throws IOException {
+    final String uniqueIdentifier = outputContext.getUniqueIdentifier();
+    Path finalOutputFile =
+        mapOutputFile.getOutputFileForWrite(0); //TODO
+    Path finalIndexFile =
+        mapOutputFile.getOutputIndexFileForWrite(0); //TODO
+
+    LOG.info("Starting flush of map output");
+    span.end();
+    merger.add(span.sort(sorter, comparator));
+    spill();
+    sortmaster.shutdown();
+
+    largeBuffer = null;
+
+    if(numSpills == 1) {
+      // someday be able to pass this directly to shuffle
+      // without writing to disk
+      final Path filename =
+          mapOutputFile.getSpillFile(0);
+      Path indexFilename =
+              mapOutputFile.getSpillIndexFile(0);
+      sameVolRename(filename, finalOutputFile);
+      sameVolRename(indexFilename, finalIndexFile);
+      return;
+    }
+    
+    //The output stream for the final single output file
+    FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
+
+    TezMerger.considerFinalMergeForProgress();
+
+    final TezSpillRecord spillRec = new TezSpillRecord(partitions);
+    final ArrayList<TezSpillRecord> indexCacheList = new ArrayList<TezSpillRecord>();
+
+    for(int i = 0; i < numSpills; i++) {
+      // TODO: build this cache before
+      Path indexFilename = mapOutputFile.getSpillIndexFile(i);
+      TezSpillRecord spillIndex = new TezSpillRecord(indexFilename, conf);
+      indexCacheList.add(spillIndex);
+    }
+    
+    for (int parts = 0; parts < partitions; parts++) {
+      //create the segments to be merged
+      List<Segment> segmentList =
+          new ArrayList<Segment>(numSpills);
+      for(int i = 0; i < numSpills; i++) {
+        Path spillFilename = mapOutputFile.getSpillFile(i);
+        TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
+
+        Segment s =
+            new Segment(conf, rfs, spillFilename, indexRecord.getStartOffset(),
+                             indexRecord.getPartLength(), codec, true);
+        segmentList.add(i, s);
+      }
+
+      int mergeFactor = 
+              this.conf.getInt(TezJobConfig.TEZ_RUNTIME_IO_SORT_FACTOR, 
+                  TezJobConfig.DEFAULT_TEZ_RUNTIME_IO_SORT_FACTOR);
+      // sort the segments only if there are intermediate merges
+      boolean sortSegments = segmentList.size() > mergeFactor;
+      //merge
+      TezRawKeyValueIterator kvIter = TezMerger.merge(conf, rfs,
+                     keyClass, valClass, codec,
+                     segmentList, mergeFactor,
+                     new Path(uniqueIdentifier),
+                     (RawComparator)ConfigUtils.getIntermediateOutputKeyComparator(conf), 
+                     nullProgressable, sortSegments,
+                     null, spilledRecordsCounter,
+                     null); // Not using any Progress in TezMerger. Should just work.
+
+      //write merged output to disk
+      long segmentStart = finalOut.getPos();
+      Writer writer =
+          new Writer(conf, finalOut, keyClass, valClass, codec,
+                           spilledRecordsCounter);
+      writer.setRLE(merger.needsRLE());
+      if (combiner == null || numSpills < minSpillsForCombine) {
+        TezMerger.writeFile(kvIter, writer, nullProgressable, conf);
+      } else {
+        runCombineProcessor(kvIter, writer);
+      }
+
+      //close
+      writer.close();
+
+      // record offsets
+      final TezIndexRecord rec = 
+          new TezIndexRecord(
+              segmentStart, 
+              writer.getRawLength(), 
+              writer.getCompressedLength());
+      spillRec.putIndex(rec, parts);
+    }
+
+    spillRec.writeToFile(finalIndexFile, conf);
+    finalOut.close();
+    for(int i = 0; i < numSpills; i++) {
+      Path indexFilename = mapOutputFile.getSpillIndexFile(i);
+      Path spillFilename = mapOutputFile.getSpillFile(i);
+      rfs.delete(indexFilename,true);
+      rfs.delete(spillFilename,true);
+    }
+  }
+
+  public void close() { }
+
+  private interface PartitionedRawKeyValueIterator extends TezRawKeyValueIterator {
+    int getPartition();
+  }
+
+  private class BufferStreamWrapper extends OutputStream 
+  {
+    private final ByteBuffer out;
+    public BufferStreamWrapper(ByteBuffer out) {
+      this.out = out;
+    }
+    
+    @Override
+    public void write(int b) throws IOException { out.put((byte)b); }
+    @Override
+    public void write(byte[] b) throws IOException { out.put(b); }
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException { out.put(b, off, len); }
+  }
+
+  protected class InputByteBuffer extends DataInputBuffer {
+    private byte[] buffer = new byte[256]; 
+    private ByteBuffer wrapped = ByteBuffer.wrap(buffer);
+    private void resize(int length) {
+      if(length > buffer.length) {
+        buffer = new byte[length];
+        wrapped = ByteBuffer.wrap(buffer);
+      }
+      wrapped.limit(length);
+    }
+    public void reset(ByteBuffer b, int start, int length) {
+      resize(length);
+      b.position(start);
+      b.get(buffer, 0, length);
+      super.reset(buffer, 0, length);
+    }
+    // clone-ish function
+    public void reset(DataInputBuffer clone) {
+      byte[] data = clone.getData();
+      int start = clone.getPosition();
+      int length = clone.getLength();
+      resize(length);
+      System.arraycopy(data, start, buffer, 0, length);
+      super.reset(buffer, 0, length);
+    }
+  }
+
+  private class SortSpan  implements IndexedSortable {
+    final IntBuffer kvmeta;
+    final ByteBuffer kvbuffer;
+    final DataOutputStream out;    
+    private RawComparator comparator; 
+    final int imeta[] = new int[NMETA];
+    final int jmeta[] = new int[NMETA];
+    int keymax = 1;
+    int valmax = 1;
+    private int i,j;
+    private byte[] ki;
+    private byte[] kj;
+    private int index = 0;
+    private InputByteBuffer hay = new InputByteBuffer();
+    private long eq = 0;
+
+    public SortSpan(ByteBuffer source, int maxItems, int perItem) {
+      int capacity = source.remaining(); 
+      int metasize = METASIZE*maxItems;
+      int dataSize = maxItems * perItem;
+      if(capacity < (metasize+dataSize)) {
+        // try to allocate less meta space, because we have sample data
+        metasize = METASIZE*(capacity/(perItem+METASIZE));
+      }
+      ByteBuffer reserved = source.duplicate();
+      reserved.mark();
+      LOG.info("reserved.remaining() = "+reserved.remaining());
+      LOG.info("reserved.size = "+metasize);
+      reserved.position(metasize);
+      kvbuffer = reserved.slice();
+      reserved.flip();
+      reserved.limit(metasize);
+      kvmeta = reserved
+                .slice()
+                .order(ByteOrder.nativeOrder())
+               .asIntBuffer();
+      out = new DataOutputStream(
+              new BufferStreamWrapper(kvbuffer));
+    }
+
+    public SpanIterator sort(IndexedSorter sorter, RawComparator comparator) {
+    	this.comparator = comparator;
+      ki = new byte[keymax];
+      kj = new byte[keymax];
+      LOG.info("begin sorting Span"+index + " ("+length()+")");
+      if(length() > 1) {
+        sorter.sort(this, 0, length(), nullProgressable);
+      }
+      LOG.info("done sorting Span"+index);
+      return new SpanIterator(this);
+    }
+
+    int offsetFor(int i) {
+      return (i * NMETA);
+    }
+
+    public void swap(final int mi, final int mj) {
+      final int kvi = offsetFor(mi);
+      final int kvj = offsetFor(mj);
+
+      kvmeta.position(kvi); kvmeta.get(imeta);
+      kvmeta.position(kvj); kvmeta.get(jmeta);
+      kvmeta.position(kvj); kvmeta.put(imeta);
+      kvmeta.position(kvi); kvmeta.put(jmeta);
+
+      if(i == mi || j == mj) i = -1;
+      if(i == mi || j == mj) j = -1;
+    }
+
+    public int compare(final int mi, final int mj) {
+      final int kvi = offsetFor(mi);
+      final int kvj = offsetFor(mj);
+      final int kvip = kvmeta.get(kvi + PARTITION);
+      final int kvjp = kvmeta.get(kvj + PARTITION);
+      // sort by partition      
+      if (kvip != kvjp) {
+        return kvip - kvjp;
+      }
+      
+      final int istart = kvmeta.get(kvi + KEYSTART);
+      final int jstart = kvmeta.get(kvj + KEYSTART);
+      final int ilen   = kvmeta.get(kvi + VALSTART) - istart;
+      final int jlen   = kvmeta.get(kvj + VALSTART) - jstart;
+
+      kvbuffer.position(istart);
+      kvbuffer.get(ki, 0, ilen);
+      kvbuffer.position(jstart);
+      kvbuffer.get(kj, 0, jlen);
+      // sort by key
+      final int cmp = comparator.compare(ki, 0, ilen, kj, 0, jlen);
+      if(cmp == 0) eq++;
+      return cmp;
+    }
+
+    public SortSpan next() {
+      ByteBuffer remaining = end();
+      if(remaining != null) {
+        int items = length();
+        int perItem = kvbuffer.position()/items;
+        SortSpan newSpan = new SortSpan(remaining, items, perItem);
+        newSpan.index = index+1;
+        return newSpan;
+      }
+      return null;
+    }
+
+    public int length() {
+      return kvmeta.limit()/NMETA;
+    }
+
+    public ByteBuffer end() {
+      ByteBuffer remaining = kvbuffer.duplicate();
+      remaining.position(kvbuffer.position());
+      remaining = remaining.slice();
+      kvbuffer.limit(kvbuffer.position());
+      kvmeta.limit(kvmeta.position());
+      int items = length();
+      if(items == 0) {
+        return null;
+      }
+      int perItem = kvbuffer.position()/items;
+      LOG.info(String.format("Span%d.length = %d, perItem = %d", index, length(), perItem));
+      if(remaining.remaining() < NMETA+perItem) {
+        return null;
+      }
+      return remaining;
+    }
+
+    private int compareInternal(DataInputBuffer needle, int needlePart, int index) {
+      int cmp = 0;
+      int keystart;
+      int valstart;
+      int partition;
+      partition = kvmeta.get(span.offsetFor(index) + PARTITION);
+      if(partition != needlePart) {
+          cmp = (partition-needlePart);
+      } else {
+        keystart = kvmeta.get(span.offsetFor(index) + KEYSTART);
+        valstart = kvmeta.get(span.offsetFor(index) + VALSTART);
+        // hay is allocated ahead of time
+        hay.reset(kvbuffer, keystart, valstart - keystart);
+        cmp = comparator.compare(hay.getData(), 
+            hay.getPosition(), hay.getLength(),
+            needle.getData(), 
+            needle.getPosition(), needle.getLength());
+      }
+      return cmp;
+    }
+    
+    public long getEq() {
+      return eq;
+    }
+    
+    @Override
+    public String toString() {
+        return String.format("Span[%d,%d]", NMETA*kvmeta.capacity(), kvbuffer.limit());
+    }
+  }
+
+  private class SpanIterator implements PartitionedRawKeyValueIterator, Comparable<SpanIterator> {
+    private int kvindex = -1;
+    private int maxindex;
+    private IntBuffer kvmeta;
+    private ByteBuffer kvbuffer;
+    private SortSpan span;
+    private InputByteBuffer key = new InputByteBuffer();
+    private InputByteBuffer value = new InputByteBuffer();
+    private Progress progress = new Progress();
+
+    private final int minrun = (1 << 4);
+
+    public SpanIterator(SortSpan span) {
+      this.kvmeta = span.kvmeta;
+      this.kvbuffer = span.kvbuffer;
+      this.span = span;
+      this.maxindex = (kvmeta.limit()/NMETA) - 1;
+    }
+
+    public DataInputBuffer getKey() throws IOException {
+      final int keystart = kvmeta.get(span.offsetFor(kvindex) + KEYSTART);
+      final int valstart = kvmeta.get(span.offsetFor(kvindex) + VALSTART);
+      key.reset(kvbuffer, keystart, valstart - keystart);
+      return key;
+    }
+
+    public DataInputBuffer getValue() throws IOException {
+      final int valstart = kvmeta.get(span.offsetFor(kvindex) + VALSTART);
+      final int vallen = kvmeta.get(span.offsetFor(kvindex) + VALLEN);
+      value.reset(kvbuffer, valstart, vallen);
+      return value;
+    }
+
+    public boolean next() throws IOException {
+      // caveat: since we use this as a comparable in the merger 
+      if(kvindex == maxindex) return false;
+      if(kvindex % 100 == 0) {
+          progress.set((kvindex-maxindex) / maxindex);
+      }
+      kvindex += 1;
+      return true;
+    }
+
+    public void close() throws IOException {
+    }
+
+    public Progress getProgress() { 
+      return progress;
+    }
+
+    public int getPartition() {
+      final int partition = kvmeta.get(span.offsetFor(kvindex) + PARTITION);
+      return partition;
+    }
+
+    public int size() {
+      return (maxindex - kvindex);
+    }
+
+    public int compareTo(SpanIterator other) {
+      try {
+        return span.compareInternal(other.getKey(), other.getPartition(), kvindex);
+      } catch(IOException ie) {
+        // since we're not reading off disk, how could getKey() throw exceptions?
+      }
+      return -1;
+    }
+    
+    @Override
+    public String toString() {
+        return String.format("SpanIterator<%d:%d> (span=%s)", kvindex, maxindex, span.toString());
+    }
+
+    /**
+     * bisect returns the next insertion point for a given raw key, skipping keys
+     * which are <= needle using a binary search instead of a linear comparison.
+     * This is massively efficient when long strings of identical keys occur.
+     * @param needle 
+     * @param needlePart
+     * @return
+     */
+    int bisect(DataInputBuffer needle, int needlePart) {
+      int start = kvindex;
+      int end = maxindex-1;
+      int mid = start;
+      int cmp = 0;
+
+      if(end - start < minrun) {
+        return 0;
+      }
+
+      if(span.compareInternal(needle, needlePart, start) > 0) {
+        return kvindex;
+      }
+      
+      // bail out early if we haven't got a min run 
+      if(span.compareInternal(needle, needlePart, start+minrun) > 0) {
+        return 0;
+      }
+
+      if(span.compareInternal(needle, needlePart, end) < 0) {
+        return end - kvindex;
+      }
+      
+      boolean found = false;
+      
+      // we sort 100k items, the max it can do is 20 loops, but break early
+      for(int i = 0; start < end && i < 16; i++) {
+        mid = start + (end - start)/2;
+        cmp = span.compareInternal(needle, needlePart, mid);
+        if(cmp == 0) {
+          start = mid;
+          found = true;
+        } else if(cmp < 0) {
+          start = mid; 
+          found = true;
+        }
+        if(cmp > 0) {
+          end = mid;
+        }
+      }
+
+      if(found) {
+        return start - kvindex;
+      }
+      return 0;
+    }
+  }
+
+  private class SortTask implements Callable<SpanIterator> {
+    private final SortSpan sortable;
+    private final IndexedSorter sorter;
+    private final RawComparator comparator;
+    
+    public SortTask(SortSpan sortable, 
+              IndexedSorter sorter, RawComparator comparator) {
+        this.sortable = sortable;
+        this.sorter = sorter;
+        this.comparator = comparator;
+    }
+
+    public SpanIterator call() {
+      return sortable.sort(sorter, comparator);
+    }
+  }
+
+  private class PartitionFilter implements TezRawKeyValueIterator {
+    private final PartitionedRawKeyValueIterator iter;
+    private int partition;
+    private boolean dirty = false;
+    public PartitionFilter(PartitionedRawKeyValueIterator iter) {
+      this.iter = iter;
+    }
+    public DataInputBuffer getKey() throws IOException { return iter.getKey(); }
+    public DataInputBuffer getValue() throws IOException { return iter.getValue(); }
+    public void close() throws IOException { }
+    public Progress getProgress() {
+      return new Progress();
+    }
+    public boolean next() throws IOException {
+      if(dirty || iter.next()) { 
+        int prefix = iter.getPartition();
+
+        if((prefix >>> (32 - partitionBits)) == partition) {
+          dirty = false; // we found what we were looking for, good
+          return true;
+        } else if(!dirty) {
+          dirty = true; // we did a lookahead and failed to find partition
+        }
+      }
+      return false;
+    }
+
+    public void reset(int partition) {
+      this.partition = partition;
+    }
+
+    public int getPartition() {
+      return this.partition;
+    }
+  }
+
+  private class SpanHeap extends java.util.PriorityQueue<SpanIterator> {
+    public SpanHeap() {
+      super(256);
+    }
+    /**
+     * {@link PriorityQueue}.poll() by a different name 
+     * @return
+     */
+    public SpanIterator pop() {
+      return this.poll();
+    }
+  }
+
+  private class SpanMerger implements PartitionedRawKeyValueIterator {
+    private final RawComparator comparator;
+    InputByteBuffer key = new InputByteBuffer();
+    InputByteBuffer value = new InputByteBuffer();
+    int partition;
+
+    private ArrayList< Future<SpanIterator>> futures = new ArrayList< Future<SpanIterator>>();
+
+    private SpanHeap heap = new SpanHeap();
+    private PartitionFilter partIter;
+
+    private int gallop = 0;
+    private SpanIterator horse;
+    private long total = 0;
+    private long count = 0;
+    private long eq = 0;
+    
+    public SpanMerger(RawComparator comparator) {
+      this.comparator = comparator;
+      partIter = new PartitionFilter(this);
+    }
+
+    public void add(SpanIterator iter) throws IOException{
+      if(iter.next()) {
+        heap.add(iter);
+      }
+    }
+
+    public void add(Future<SpanIterator> iter) throws IOException{
+      this.futures.add(iter);
+    }
+
+    public boolean ready() throws IOException, InterruptedException {
+      try {
+        SpanIterator iter = null;
+        while(this.futures.size() > 0) {
+          Future<SpanIterator> futureIter = this.futures.remove(0);
+          iter = futureIter.get();
+          this.add(iter);
+        }
+        
+        StringBuilder sb = new StringBuilder();
+        for(SpanIterator sp: heap) {
+            sb.append(sp.toString());
+            sb.append(",");
+            total += sp.span.length();
+            eq += sp.span.getEq();
+        }
+        LOG.info("Heap = " + sb.toString());
+        return true;
+      } catch(Exception e) {
+        LOG.info(e.toString());
+        return false;
+      }
+    }
+
+    private SpanIterator pop() throws IOException {
+      if(gallop > 0) {
+        gallop--;
+        return horse;
+      }
+      SpanIterator current = heap.pop();
+      SpanIterator next = heap.peek();
+      if(next != null && current != null &&
+        ((Object)horse) == ((Object)current)) {
+        // TODO: a better threshold check
+        gallop = current.bisect(next.getKey(), next.getPartition())-1;
+      }
+      horse = current;
+      return current;
+    }
+    
+    public boolean needsRLE() {
+      return (eq > 0.1 * total);
+    }
+    
+    private SpanIterator peek() throws IOException {
+    	if(gallop > 0) {
+            return horse;
+        }
+    	return heap.peek();
+    }
+
+    public boolean next() throws IOException {
+      SpanIterator current = pop();
+
+      if(current != null) {
+        // keep local copies, since add() will move it all out
+        key.reset(current.getKey());
+        value.reset(current.getValue());
+        partition = current.getPartition();
+        if(gallop <= 0) {
+          this.add(current);
+        } else {
+          // galloping
+          current.next();
+        }
+        return true;
+      }
+      return false;
+    }
+
+    public DataInputBuffer getKey() throws IOException { return key; }
+    public DataInputBuffer getValue() throws IOException { return value; }
+    public int getPartition() { return partition; }
+
+    public void close() throws IOException {
+    }
+
+    public Progress getProgress() {
+      // TODO
+      return new Progress();
+    }
+
+    public TezRawKeyValueIterator filter(int partition) {
+      partIter.reset(partition);
+      return partIter;
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezIndexRecord.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezIndexRecord.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezIndexRecord.java
new file mode 100644
index 0000000..95ae8eb
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezIndexRecord.java
@@ -0,0 +1,45 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.runtime.library.common.sort.impl;
+
+public class TezIndexRecord {
+  private long startOffset;
+  private long rawLength;
+  private long partLength;
+
+  public TezIndexRecord() { }
+
+  public TezIndexRecord(long startOffset, long rawLength, long partLength) {
+    this.startOffset = startOffset;
+    this.rawLength = rawLength;
+    this.partLength = partLength;
+  }
+
+  public long getStartOffset() {
+    return startOffset;
+  }
+
+  public long getRawLength() {
+    return rawLength;
+  }
+
+  public long getPartLength() {
+    return partLength;
+  }
+}