You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/05/01 08:46:11 UTC

git commit: TEZ-661. Add an implementation for a non sorted, partitioned, key-value output. (sseth)

Repository: incubator-tez
Updated Branches:
  refs/heads/master 2275c4ee1 -> 26ad7037c


TEZ-661. Add an implementation for a non sorted, partitioned, key-value
output. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/26ad7037
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/26ad7037
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/26ad7037

Branch: refs/heads/master
Commit: 26ad7037c3c01347be63ad33810faee0a3922a94
Parents: 2275c4e
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Apr 30 23:45:21 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Apr 30 23:45:21 2014 -0700

----------------------------------------------------------------------
 .../org/apache/tez/common/TezJobConfig.java     |  15 +
 .../apache/tez/common/counters/TaskCounter.java |   6 +
 .../tez/mapreduce/partition/MRPartitioner.java  |   9 +-
 .../common/sort/impl/TezIndexRecord.java        |   5 +
 .../BaseUnorderedPartitionedKVWriter.java       | 161 ++++
 .../writers/UnorderedPartitionedKVWriter.java   | 791 +++++++++++++++++++
 .../OnFileUnorderedPartitionedKVOutput.java     |  78 +-
 .../library/partitioner/HashPartitioner.java    |  30 +
 .../WeightedScalingMemoryDistributor.java       |   3 +
 .../TestUnorderedPartitionedKVWriter.java       | 673 ++++++++++++++++
 10 files changed, 1763 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/26ad7037/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java b/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
index ae122ff..8a61c39 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
@@ -132,6 +132,21 @@ public class TezJobConfig {
   public static final int DEFAULT_TEZ_RUNTIME_SORT_THREADS = 1;
 
   /**
+   * Size of the buffer to use if not writing directly to disk.
+   */
+  public static final String TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB = 
+      "tez.runtime.unordered.output.buffer.size-mb";
+  public static final int TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB_DEFAULT = 100;
+  
+  /**
+   * Maximum size for individual buffers used in the UnsortedPartitionedOutput.
+   * This is only meant to be used by unit tests.
+   */
+  @Private
+  public static final String TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES =
+      "tez.runtime.unordered.output.max-per-buffer.size-bytes";
+  
+  /**
    * Specifies a partitioner class, which is used in Tez Runtime components
    * like OnFileSortedOutput
    */

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/26ad7037/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
index 731d4ac..8ebc6c8 100644
--- a/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
+++ b/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
@@ -91,6 +91,12 @@ public enum TaskCounter {
    */
   OUTPUT_RECORDS,
   
+  /**
+   * Represent the number of large records in the output - typically, records which are
+   * spilled directly
+   */
+  OUTPUT_LARGE_RECORDS,
+  
   SKIPPED_RECORDS, // Not used at the moment.
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/26ad7037/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java
index 224900e..c8edcf3 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java
@@ -33,16 +33,16 @@ public class MRPartitioner implements org.apache.tez.runtime.library.api.Partiti
   static final Log LOG = LogFactory.getLog(MRPartitioner.class);
 
   private final boolean useNewApi;
-  private int partitions = 1;
 
-  private org.apache.hadoop.mapreduce.Partitioner newPartitioner;
-  private org.apache.hadoop.mapred.Partitioner oldPartitioner;
+  private final org.apache.hadoop.mapreduce.Partitioner newPartitioner;
+  private final org.apache.hadoop.mapred.Partitioner oldPartitioner;
 
   public MRPartitioner(Configuration conf) {
     this.useNewApi = ConfigUtils.useNewApi(conf);
-    this.partitions = conf.getInt(TezJobConfig.TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS, 1);
+    int partitions = conf.getInt(TezJobConfig.TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS, 1);
 
     if (useNewApi) {
+      oldPartitioner = null;
       if (partitions > 1) {
         newPartitioner = (org.apache.hadoop.mapreduce.Partitioner) ReflectionUtils
             .newInstance(
@@ -58,6 +58,7 @@ public class MRPartitioner implements org.apache.tez.runtime.library.api.Partiti
         };
       }
     } else {
+      newPartitioner = null;
       if (partitions > 1) {
         oldPartitioner = (org.apache.hadoop.mapred.Partitioner) ReflectionUtils.newInstance(
             (Class<? extends org.apache.hadoop.mapred.Partitioner>) conf.getClass(

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/26ad7037/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezIndexRecord.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezIndexRecord.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezIndexRecord.java
index f6fd802..a5bacd2 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezIndexRecord.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezIndexRecord.java
@@ -25,6 +25,11 @@ public class TezIndexRecord {
 
   public TezIndexRecord() { }
 
+  /**
+   * @param startOffset start offset within the data file
+   * @param rawLength raw data length - typically uncompressed
+   * @param partLength actual data length in file - factors in checksums and compression
+   */
   public TezIndexRecord(long startOffset, long rawLength, long partLength) {
     this.startOffset = startOffset;
     this.rawLength = rawLength;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/26ad7037/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java
new file mode 100644
index 0000000..f2dccd9
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java
@@ -0,0 +1,161 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.runtime.library.common.writers;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TezOutputContext;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+import org.apache.tez.runtime.library.api.Partitioner;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.TezRuntimeUtils;
+import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
+
+@SuppressWarnings("rawtypes")
+public abstract class BaseUnorderedPartitionedKVWriter implements KeyValueWriter {
+
+  private static final Log LOG = LogFactory.getLog(BaseUnorderedPartitionedKVWriter.class);
+  
+  protected final TezOutputContext outputContext;
+  protected final Configuration conf;
+  protected final Partitioner partitioner;
+  protected final Class keyClass;
+  protected final Class valClass;
+  protected final Serializer keySerializer;
+  protected final Serializer valSerializer;
+  protected final SerializationFactory serializationFactory;
+  protected final int numPartitions;
+  protected final CompressionCodec codec;
+  protected final TezTaskOutput outputFileHandler;
+  
+  protected final boolean ifileReadAhead;
+  protected final int ifileReadAheadLength;
+  protected final int ifileBufferSize;
+
+  /**
+   * Represents the serialized size of the output records. Does not consider
+   * overheads from the buffer meta-information, storage format, or compression
+   * if it is enabled.
+   */
+  protected final TezCounter outputRecordBytesCounter;
+  /**
+   * Represents final number of records written (spills are not counted)
+   */
+  protected final TezCounter outputRecordsCounter;
+  /**
+   * Represents the size of the final output - with any overheads introduced by
+   * meta-information.
+   */
+  protected final TezCounter outputBytesWithOverheadCounter;
+  
+  /**
+   * Represents the final output size, with file format overheads and compression factored in.
+   * Does not consider spills.
+   */
+  protected final TezCounter fileOutputBytesCounter;
+  /**
+   * Represents the additional records written to disk due to spills. Does not
+   * count the final write to disk.
+   */
+  protected final TezCounter spilledRecordsCounter;
+  /**
+   * Represents additional bytes written to disk as a result of spills, excluding the final spill.
+   */
+  protected final TezCounter additionalSpillBytesWritternCounter;
+  /**
+   * Represents additional bytes read from disk to merge all the previous spills into a single file.
+   */
+  protected final TezCounter additionalSpillBytesReadCounter;
+  /**
+   * Represents the number of additional spills. The final spill is not counted towards this.
+   */
+  protected final TezCounter numAdditionalSpillsCounter;
+
+  @SuppressWarnings("unchecked")
+  public BaseUnorderedPartitionedKVWriter(TezOutputContext outputContext, Configuration conf, int numOutputs) {
+    this.outputContext = outputContext;
+    this.conf = conf;
+    this.numPartitions = numOutputs;
+    
+    // k/v serialization
+    keyClass = ConfigUtils.getIntermediateOutputKeyClass(this.conf);
+    valClass = ConfigUtils.getIntermediateOutputValueClass(this.conf);
+    serializationFactory = new SerializationFactory(this.conf);
+    keySerializer = serializationFactory.getSerializer(keyClass);
+    valSerializer = serializationFactory.getSerializer(valClass);
+    
+    outputRecordBytesCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES);
+    outputRecordsCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_RECORDS);
+    outputBytesWithOverheadCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES_WITH_OVERHEAD);
+    fileOutputBytesCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES_PHYSICAL);
+    spilledRecordsCounter = outputContext.getCounters().findCounter(TaskCounter.SPILLED_RECORDS);
+    additionalSpillBytesWritternCounter = outputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN);
+    additionalSpillBytesReadCounter = outputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ);
+    numAdditionalSpillsCounter = outputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILL_COUNT);
+    
+    // compression
+    if (ConfigUtils.shouldCompressIntermediateOutput(this.conf)) {
+      Class<? extends CompressionCodec> codecClass =
+          ConfigUtils.getIntermediateOutputCompressorClass(this.conf, DefaultCodec.class);
+      codec = ReflectionUtils.newInstance(codecClass, this.conf);
+    } else {
+      codec = null;
+    }
+    
+    this.ifileReadAhead = this.conf.getBoolean(
+        TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
+        TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT);
+    if (this.ifileReadAhead) {
+      this.ifileReadAheadLength = conf.getInt(
+          TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES,
+          TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT);
+    } else {
+      this.ifileReadAheadLength = 0;
+    }
+    this.ifileBufferSize = conf.getInt("io.file.buffer.size",
+        TezJobConfig.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT);
+    
+    LOG.info("Instantiating Partitioner: [" + conf.get(TezJobConfig.TEZ_RUNTIME_PARTITIONER_CLASS) + "]");
+    try {
+      this.partitioner = TezRuntimeUtils.instantiatePartitioner(this.conf);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    outputFileHandler = TezRuntimeUtils.instantiateTaskOutputManager(conf, outputContext);
+  }
+
+  @Override
+  public abstract void write(Object key, Object value) throws IOException;
+  
+  public abstract List<Event> close() throws IOException, InterruptedException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/26ad7037/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
new file mode 100644
index 0000000..2d51a7d
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -0,0 +1,791 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.common.writers;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.IntBuffer;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TezOutputContext;
+import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
+import org.apache.tez.runtime.library.common.Constants;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
+import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
+import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
+import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.ByteString;
+
+public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWriter {
+
+  private static final Log LOG = LogFactory.getLog(UnorderedPartitionedKVWriter.class);
+
+  private static final int INT_SIZE = 4;
+  private static final int NUM_META = 3; // Number of meta fields.
+  private static final int INDEX_KEYLEN = 0; // KeyLength index
+  private static final int INDEX_VALLEN = 1; // ValLength index
+  private static final int INDEX_NEXT = 2; // Next Record Index.
+  private static final int META_SIZE = NUM_META * INT_SIZE; // Size of total meta-data
+
+  private final static int APPROX_HEADER_LENGTH = 150;
+
+  // Maybe setup a separate statistics class which can be shared between the
+  // buffer and the main path instead of having multiple arrays.
+
+  private final long availableMemory;
+  @VisibleForTesting
+  final WrappedBuffer[] buffers;
+  @VisibleForTesting
+  final BlockingQueue<WrappedBuffer> availableBuffers;
+  private final ByteArrayOutputStream baos;
+  private final DataOutputStream dos;
+  @VisibleForTesting
+  WrappedBuffer currentBuffer;
+  private final FileSystem rfs;
+
+  private final List<SpillInfo> spillInfoList = Collections
+      .synchronizedList(new ArrayList<SpillInfo>());
+
+  private final ListeningExecutorService spillExecutor;
+
+  private final int[] numRecordsPerPartition;
+  private volatile long spilledSize = 0;
+
+  /**
+   * Represents final number of records written (spills are not counted)
+   */
+  protected final TezCounter outputLargeRecordsCounter;
+
+  @VisibleForTesting
+  int numBuffers;
+  @VisibleForTesting
+  int sizePerBuffer;
+  @VisibleForTesting
+  int numInitializedBuffers;
+
+  private Throwable spillException;
+  private AtomicBoolean isShutdown = new AtomicBoolean(false);
+  @VisibleForTesting
+  final AtomicInteger numSpills = new AtomicInteger(0);
+  private final AtomicInteger pendingSpillCount = new AtomicInteger(0);
+
+  private final ReentrantLock spillLock = new ReentrantLock();
+  private final Condition spillInProgress = spillLock.newCondition();
+
+  public UnorderedPartitionedKVWriter(TezOutputContext outputContext, Configuration conf,
+      int numOutputs, long availableMemoryBytes) throws IOException {
+    super(outputContext, conf, numOutputs);
+    Preconditions.checkArgument(availableMemoryBytes > 0, "availableMemory should not be > 0 bytes");
+    // Ideally, should be significantly larger.
+    availableMemory = availableMemoryBytes;
+
+    // Allow unit tests to control the buffer sizes.
+    int maxSingleBufferSizeBytes = conf.getInt(
+        TezJobConfig.TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES, Integer.MAX_VALUE);
+    computeNumBuffersAndSize(maxSingleBufferSizeBytes);
+    LOG.info("Running with numBuffers=" + numBuffers + ", sizePerBuffer=" + sizePerBuffer);
+    availableBuffers = new LinkedBlockingQueue<WrappedBuffer>();
+    buffers = new WrappedBuffer[numBuffers];
+    // Set up only the first buffer to start with.
+    buffers[0] = new WrappedBuffer(numOutputs, sizePerBuffer);
+    numInitializedBuffers = 1;
+    LOG.info("Initialize Buffer #" + numInitializedBuffers + " with size=" + sizePerBuffer);
+    currentBuffer = buffers[0];
+    baos = new ByteArrayOutputStream();
+    dos = new DataOutputStream(baos);
+    keySerializer.open(dos);
+    valSerializer.open(dos);
+    rfs = ((LocalFileSystem) FileSystem.getLocal(this.conf)).getRaw();
+
+    ExecutorService executor = Executors.newFixedThreadPool(
+        1,
+        new ThreadFactoryBuilder()
+            .setDaemon(true)
+            .setNameFormat(
+                "UnorderedOutSpiller ["
+                    + TezUtils.cleanVertexName(outputContext.getDestinationVertexName()) + "]")
+            .build());
+    spillExecutor = MoreExecutors.listeningDecorator(executor);
+    numRecordsPerPartition = new int[numPartitions];
+
+    outputLargeRecordsCounter = outputContext.getCounters().findCounter(
+        TaskCounter.OUTPUT_LARGE_RECORDS);
+  }
+
+  private void computeNumBuffersAndSize(int bufferLimit) {
+    numBuffers = Math.max(2, (int) (availableMemory / bufferLimit)
+        + ((availableMemory % bufferLimit) == 0 ? 0 : 1));
+    sizePerBuffer = (int) (availableMemory / numBuffers);
+    sizePerBuffer = sizePerBuffer - (sizePerBuffer % INT_SIZE);
+  }
+
+  @Override
+  public void write(Object key, Object value) throws IOException {
+    // Skipping checks for key-value types. IFile takes care of these, but should be removed from
+    // there as well.
+
+    // How expensive are checks like these ?
+    if (isShutdown.get()) {
+      throw new RuntimeException("Writer already closed");
+    }
+    if (spillException != null) {
+      // Already reported as a fatalError - report to the user code
+      throw new IOException("Exception during spill", new IOException(spillException));
+    }
+    int partition = partitioner.getPartition(key, value, numPartitions);
+    write(key, value, partition);
+  }
+
+  @SuppressWarnings("unchecked")
+  private void write(Object key, Object value, int partition) throws IOException {
+    // Wrap to 4 byte (Int) boundary for metaData
+    int mod = currentBuffer.nextPosition % INT_SIZE;
+    int metaSkip = mod == 0 ? 0 : (INT_SIZE - mod);
+    if (currentBuffer.availableSize < (META_SIZE + metaSkip)) {
+      // Move over to the next buffer.
+      metaSkip = 0;
+      setupNextBuffer();
+    }
+    currentBuffer.nextPosition += metaSkip;
+    int metaStart = currentBuffer.nextPosition;
+    currentBuffer.availableSize -= (META_SIZE + metaSkip);
+    currentBuffer.nextPosition += META_SIZE;
+    try {
+      keySerializer.serialize(key);
+    } catch (BufferTooSmallException e) {
+      if (metaStart == 0) { // Started writing at the start of the buffer. Write Key to disk.
+        // Key too large for any buffer. Write entire record to disk.
+        currentBuffer.reset();
+        writeLargeRecord(key, value, partition, numSpills.incrementAndGet());
+        return;
+      } else { // Exceeded length on current buffer.
+        // Try resetting the buffer to the next one, if this was not the start of a buffer,
+        // and begin spilling the current buffer to disk if it has any records.
+        setupNextBuffer();
+        write(key, value, partition);
+        return;
+      }
+    }
+    int valStart = currentBuffer.nextPosition;
+    try {
+      valSerializer.serialize(value);
+    } catch (BufferTooSmallException e) {
+      // Value too large for current buffer, or K-V too large for entire buffer.
+      if (metaStart == 0) {
+        // Key + Value too large for a single buffer.
+        currentBuffer.reset();
+        writeLargeRecord(key, value, partition, numSpills.incrementAndGet());
+        return;
+      } else { // Exceeded length on current buffer.
+        // Try writing key+value to a new buffer - will fall back to disk if that fails.
+        setupNextBuffer();
+        write(key, value, partition);
+        return;
+      }
+    }
+
+    // Meta-data updates
+    int metaIndex = metaStart / INT_SIZE;
+    int indexNext = currentBuffer.partitionPositions[partition];
+
+    currentBuffer.metaBuffer.put(metaIndex + INDEX_KEYLEN, (valStart - (metaStart + META_SIZE)));
+    currentBuffer.metaBuffer.put(metaIndex + INDEX_VALLEN, (currentBuffer.nextPosition - valStart));
+    currentBuffer.metaBuffer.put(metaIndex + INDEX_NEXT, indexNext);
+    currentBuffer.skipSize += metaSkip; // For size estimation
+    // Update stats on number of records
+    outputRecordBytesCounter.increment(currentBuffer.nextPosition - (metaStart + META_SIZE));
+    outputBytesWithOverheadCounter.increment((currentBuffer.nextPosition - metaStart) + metaSkip);
+    outputRecordsCounter.increment(1);
+    currentBuffer.partitionPositions[partition] = metaStart;
+    currentBuffer.recordsPerPartition[partition]++;
+    currentBuffer.numRecords++;
+
+  }
+
+  private void setupNextBuffer() throws IOException {
+
+    if (currentBuffer.numRecords == 0) {
+      currentBuffer.reset();
+    } else {
+      // Update overall stats
+      LOG.info("Moving to next buffer and triggering spill");
+      updateGlobalStats(currentBuffer);
+
+      pendingSpillCount.incrementAndGet();
+
+      ListenableFuture<SpillResult> future = spillExecutor.submit(new SpillCallable(currentBuffer,
+          numSpills.incrementAndGet(), codec, spilledRecordsCounter, false));
+      Futures.addCallback(future, new SpillCallback(numSpills.get()));
+
+      WrappedBuffer wb = getNextAvailableBuffer();
+      currentBuffer = wb;
+    }
+  }
+
+  private void updateGlobalStats(WrappedBuffer buffer) {
+    for (int i = 0; i < numPartitions; i++) {
+      numRecordsPerPartition[i] += buffer.recordsPerPartition[i];
+    }
+  }
+
+  private WrappedBuffer getNextAvailableBuffer() throws IOException {
+    if (availableBuffers.peek() == null) {
+      if (numInitializedBuffers < numBuffers) {
+        buffers[numInitializedBuffers] = new WrappedBuffer(numPartitions, sizePerBuffer);
+        numInitializedBuffers++;
+        return buffers[numInitializedBuffers - 1];
+      } else {
+        // All buffers initialized, and none available right now. Wait
+        try {
+          return availableBuffers.take();
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new IOException("Interrupted while waiting for next buffer", e);
+        }
+      }
+    } else {
+      return availableBuffers.poll();
+    }
+  }
+
+  // All spills using compression for now.
+  private class SpillCallable implements Callable<SpillResult> {
+
+    private final WrappedBuffer wrappedBuffer;
+    private final CompressionCodec codec;
+    private final TezCounter numRecordsCounter;
+    private final int spillNumber;
+    private final boolean isFinalSpill;
+
+    public SpillCallable(WrappedBuffer wrappedBuffer, int spillNumber, CompressionCodec codec,
+        TezCounter numRecordsCounter, boolean isFinal) {
+      this.wrappedBuffer = wrappedBuffer;
+      this.codec = codec;
+      this.numRecordsCounter = numRecordsCounter;
+      this.spillNumber = spillNumber;
+      this.isFinalSpill = isFinal;
+    }
+
+    @Override
+    public SpillResult call() throws IOException {
+      // This should not be called with an empty buffer. Check before invoking.
+
+      // Number of parallel spills determined by number of threads.
+      // Last spill synchronization handled separately.
+      SpillResult spillResult = null;
+      long spillSize = wrappedBuffer.nextPosition + numPartitions * APPROX_HEADER_LENGTH;
+      Path outPath = null;
+      if (isFinalSpill) {
+        outPath = outputFileHandler.getOutputFileForWrite(spillSize);
+      } else {
+        outPath = outputFileHandler.getSpillFileForWrite(spillNumber, spillSize);
+      }
+      FSDataOutputStream out = rfs.create(outPath);
+      TezSpillRecord spillRecord = new TezSpillRecord(numPartitions);
+      DataInputBuffer key = new DataInputBuffer();
+      DataInputBuffer val = new DataInputBuffer();
+      for (int i = 0; i < numPartitions; i++) {
+        IFile.Writer writer = null;
+        try {
+          long segmentStart = out.getPos();
+          if (wrappedBuffer.partitionPositions[i] == WrappedBuffer.PARTITION_ABSENT_POSITION) {
+            // Skip empty partition.
+            continue;
+          }
+          writer = new Writer(conf, out, keyClass, valClass, codec, numRecordsCounter, null);
+          writePartition(wrappedBuffer.partitionPositions[i], wrappedBuffer, writer, key, val);
+          writer.close();
+          if (isFinalSpill) {
+            fileOutputBytesCounter.increment(writer.getCompressedLength());
+          } else {
+            additionalSpillBytesWritternCounter.increment(writer.getCompressedLength());
+          }
+          spillResult = new SpillResult(writer.getCompressedLength(), this.wrappedBuffer);
+          TezIndexRecord indexRecord = new TezIndexRecord(segmentStart, writer.getRawLength(),
+              writer.getCompressedLength());
+          spillRecord.putIndex(indexRecord, i);
+          writer = null;
+        } finally {
+          if (writer != null) {
+            writer.close();
+          }
+        }
+      }
+      if (isFinalSpill) {
+        long indexFileSizeEstimate = numPartitions * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH;
+        Path finalSpillFile = outputFileHandler.getOutputIndexFileForWrite(indexFileSizeEstimate);
+        spillRecord.writeToFile(finalSpillFile, conf);
+        fileOutputBytesCounter.increment(indexFileSizeEstimate);
+        LOG.info("Finished final and only spill");
+      } else {
+        SpillInfo spillInfo = new SpillInfo(spillRecord, outPath);
+        spillInfoList.add(spillInfo);
+        numAdditionalSpillsCounter.increment(1);
+        LOG.info("Finished spill " + spillNumber);
+      }
+      return spillResult;
+    }
+  }
+
+  private void writePartition(int pos, WrappedBuffer wrappedBuffer, Writer writer,
+      DataInputBuffer keyBuffer, DataInputBuffer valBuffer) throws IOException {
+    while (pos != WrappedBuffer.PARTITION_ABSENT_POSITION) {
+      int metaIndex = pos / INT_SIZE;
+      int keyLength = wrappedBuffer.metaBuffer.get(metaIndex + INDEX_KEYLEN);
+      int valLength = wrappedBuffer.metaBuffer.get(metaIndex + INDEX_VALLEN);
+      keyBuffer.reset(wrappedBuffer.buffer, pos + META_SIZE, keyLength);
+      valBuffer.reset(wrappedBuffer.buffer, pos + META_SIZE + keyLength, valLength);
+
+      writer.append(keyBuffer, valBuffer);
+      pos = wrappedBuffer.metaBuffer.get(metaIndex + INDEX_NEXT);
+    }
+  }
+
+  public static long getInitialMemoryRequirement(Configuration conf, long maxAvailableTaskMemory) {
+    int initialMemRequestMb = conf.getInt(TezJobConfig.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB,
+        TezJobConfig.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB_DEFAULT);
+    Preconditions.checkArgument(initialMemRequestMb != 0,
+        TezJobConfig.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB + " should be larger than 0");
+    long reqBytes = initialMemRequestMb << 20;
+    LOG.info("Requested BufferSize (" + TezJobConfig.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB
+        + ") : " + initialMemRequestMb);
+    return reqBytes;
+  }
+
+  @Override
+  public List<Event> close() throws IOException, InterruptedException {
+    isShutdown.set(true);
+    spillLock.lock();
+    LOG.info("Waiting for all spills to complete : Pending : " + pendingSpillCount.get());
+    try {
+      while (pendingSpillCount.get() != 0 && spillException == null) {
+        spillInProgress.await();
+      }
+    } finally {
+      spillLock.unlock();
+    }
+    if (spillException != null) {
+      LOG.fatal("Error during spill, throwing");
+      // Assuming close will be called on the same thread as the write
+      cleanup();
+      currentBuffer.cleanup();
+      currentBuffer = null;
+      if (spillException instanceof IOException) {
+        throw (IOException) spillException;
+      } else {
+        throw new IOException(spillException);
+      }
+    } else {
+      LOG.info("All spills complete");
+      // Assuming close will be called on the same thread as the write
+      cleanup();
+      if (numSpills.get() > 0) {
+        mergeAll();
+      } else {
+        finalSpill();
+      }
+
+      currentBuffer.cleanup();
+      currentBuffer = null;
+    }
+
+    return Collections.singletonList(generateEvent());
+  }
+
+  private void cleanup() {
+    if (spillExecutor != null) {
+      spillExecutor.shutdownNow();
+    }
+    for (int i = 0; i < buffers.length; i++) {
+      if (buffers[i] != null && buffers[i] != currentBuffer) {
+        buffers[i].cleanup();
+        buffers[i] = null;
+      }
+    }
+    availableBuffers.clear();
+  }
+
+  private Event generateEvent() throws IOException {
+    DataMovementEventPayloadProto.Builder payloadBuidler = DataMovementEventPayloadProto
+        .newBuilder();
+
+    String host = getHost();
+    int shufflePort = getShufflePort();
+
+    BitSet emptyPartitions = new BitSet();
+    for (int i = 0; i < numPartitions; i++) {
+      if (numRecordsPerPartition[i] == 0) {
+        emptyPartitions.set(i);
+      }
+    }
+    if (emptyPartitions.cardinality() != 0) {
+      // Empty partitions exist
+      ByteString emptyPartitionsByteString = TezUtils.compressByteArrayToByteString(TezUtils
+          .toByteArray(emptyPartitions));
+      payloadBuidler.setEmptyPartitions(emptyPartitionsByteString);
+    }
+    if (emptyPartitions.cardinality() != numPartitions) {
+      // Populate payload only if at least 1 partition has data
+      payloadBuidler.setHost(host);
+      payloadBuidler.setPort(shufflePort);
+      payloadBuidler.setPathComponent(outputContext.getUniqueIdentifier());
+    }
+
+    CompositeDataMovementEvent cDme = new CompositeDataMovementEvent(0, numPartitions,
+        payloadBuidler.build().toByteArray());
+    return cDme;
+  }
+
+  private void finalSpill() throws IOException {
+    if (currentBuffer.nextPosition == 0) {
+      return;
+    } else {
+      updateGlobalStats(currentBuffer);
+      SpillCallable spillCallable = new SpillCallable(currentBuffer, 0, codec, null, true);
+      spillCallable.call();
+      return;
+    }
+
+  }
+
+  private void mergeAll() throws IOException {
+    long expectedSize = spilledSize;
+    if (currentBuffer.nextPosition != 0) {
+      expectedSize += currentBuffer.nextPosition - (currentBuffer.numRecords * META_SIZE)
+          - currentBuffer.skipSize + numPartitions * APPROX_HEADER_LENGTH;
+      // Update final statistics.
+      updateGlobalStats(currentBuffer);
+    }
+
+    long indexFileSizeEstimate = numPartitions * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH;
+    Path finalOutPath = outputFileHandler.getOutputFileForWrite(expectedSize);
+    Path finalIndexPath = outputFileHandler.getOutputIndexFileForWrite(indexFileSizeEstimate);
+
+    TezSpillRecord finalSpillRecord = new TezSpillRecord(numPartitions);
+
+    DataInputBuffer keyBuffer = new DataInputBuffer();
+    DataInputBuffer valBuffer = new DataInputBuffer();
+
+    DataInputBuffer keyBufferIFile = new DataInputBuffer();
+    DataInputBuffer valBufferIFile = new DataInputBuffer();
+
+    FSDataOutputStream out = null;
+    try {
+      out = rfs.create(finalOutPath);
+      Writer writer = null;
+
+      for (int i = 0; i < numPartitions; i++) {
+        long segmentStart = out.getPos();
+        if (numRecordsPerPartition[i] == 0) {
+          LOG.info("Skipping partition: " + i + " in final merge since it has no records");
+          continue;
+        }
+        writer = new Writer(conf, out, keyClass, valClass, codec, null, null);
+        try {
+          if (currentBuffer.nextPosition != 0
+              && currentBuffer.partitionPositions[i] != WrappedBuffer.PARTITION_ABSENT_POSITION) {
+            // Write current buffer.
+            writePartition(currentBuffer.partitionPositions[i], currentBuffer, writer, keyBuffer,
+                valBuffer);
+          }
+          synchronized (spillInfoList) {
+            for (SpillInfo spillInfo : spillInfoList) {
+              TezIndexRecord indexRecord = spillInfo.spillRecord.getIndex(i);
+              if (indexRecord.getPartLength() == 0) {
+                // Skip empty partitions within a spill
+                continue;
+              }
+              FSDataInputStream in = rfs.open(spillInfo.outPath);
+              in.seek(indexRecord.getStartOffset());
+              IFile.Reader reader = new IFile.Reader(in, indexRecord.getPartLength(), codec, null,
+                  additionalSpillBytesReadCounter, ifileReadAhead, ifileReadAheadLength,
+                  ifileBufferSize);
+              while (reader.nextRawKey(keyBufferIFile)) {
+                // TODO Inefficient. If spills are not compressed, a direct copy should be possible
+                // given the current IFile format. Also exteremely inefficient for large records,
+                // since the entire record will be read into memory.
+                reader.nextRawValue(valBufferIFile);
+                writer.append(keyBufferIFile, valBufferIFile);
+              }
+              reader.close();
+            }
+          }
+          writer.close();
+          fileOutputBytesCounter.increment(writer.getCompressedLength());
+          TezIndexRecord indexRecord = new TezIndexRecord(segmentStart, writer.getRawLength(),
+              writer.getCompressedLength());
+          writer = null;
+          finalSpillRecord.putIndex(indexRecord, i);
+        } finally {
+          if (writer != null) {
+            writer.close();
+          }
+        }
+      }
+    } finally {
+      if (out != null) {
+        out.close();
+      }
+    }
+    finalSpillRecord.writeToFile(finalIndexPath, conf);
+    fileOutputBytesCounter.increment(indexFileSizeEstimate);
+    LOG.info("Finished final spill after merging : " + numSpills.get() + " spills");
+  }
+
+  private void writeLargeRecord(final Object key, final Object value, final int partition,
+      final int spillNumber) throws IOException {
+    numAdditionalSpillsCounter.increment(1);
+    long size = sizePerBuffer - (currentBuffer.numRecords * META_SIZE) - currentBuffer.skipSize
+        + numPartitions * APPROX_HEADER_LENGTH;
+    FSDataOutputStream out = null;
+    long outSize = 0;
+    try {
+      final TezSpillRecord spillRecord = new TezSpillRecord(numPartitions);
+      final Path outPath = outputFileHandler.getSpillFileForWrite(spillNumber, size);
+      out = rfs.create(outPath);
+      for (int i = 0; i < numPartitions; i++) {
+        final long recordStart = out.getPos();
+        if (i == partition) {
+          spilledRecordsCounter.increment(1);
+          Writer writer = null;
+          try {
+            writer = new IFile.Writer(conf, out, keyClass, valClass, codec, null, null);
+            writer.append(key, value);
+            outputLargeRecordsCounter.increment(1);
+            numRecordsPerPartition[i]++;
+            writer.close();
+            additionalSpillBytesWritternCounter.increment(writer.getCompressedLength());
+            TezIndexRecord indexRecord = new TezIndexRecord(recordStart, writer.getRawLength(),
+                writer.getCompressedLength());
+            spillRecord.putIndex(indexRecord, i);
+            outSize = writer.getCompressedLength();
+            writer = null;
+          } finally {
+            if (writer != null) {
+              writer.close();
+            }
+          }
+        }
+      }
+      SpillInfo spillInfo = new SpillInfo(spillRecord, outPath);
+      spillInfoList.add(spillInfo);
+      LOG.info("Finished writing large record of size " + outSize + " to spill file " + spillNumber);
+    } finally {
+      if (out != null) {
+        out.close();
+      }
+    }
+  }
+
+  private class ByteArrayOutputStream extends OutputStream {
+
+    private final byte[] scratch = new byte[1];
+
+    @Override
+    public void write(int v) throws IOException {
+      scratch[0] = (byte) v;
+      write(scratch, 0, 1);
+    }
+
+    public void write(byte[] b, int off, int len) throws IOException {
+      if (len > currentBuffer.availableSize) {
+        throw new BufferTooSmallException();
+      } else {
+        System.arraycopy(b, off, currentBuffer.buffer, currentBuffer.nextPosition, len);
+        currentBuffer.nextPosition += len;
+        currentBuffer.availableSize -= len;
+      }
+    }
+  }
+
+  private static class WrappedBuffer {
+
+    private static final int PARTITION_ABSENT_POSITION = -1;
+
+    private final int[] partitionPositions;
+    private final int[] recordsPerPartition;
+    private final int numPartitions;
+    private final int size;
+
+    private byte[] buffer;
+    private IntBuffer metaBuffer;
+
+    private int numRecords = 0;
+    private int skipSize = 0;
+
+    private int nextPosition = 0;
+    private int availableSize;
+
+    WrappedBuffer(int numPartitions, int size) {
+      this.partitionPositions = new int[numPartitions];
+      this.recordsPerPartition = new int[numPartitions];
+      this.numPartitions = numPartitions;
+      for (int i = 0; i < numPartitions; i++) {
+        this.partitionPositions[i] = PARTITION_ABSENT_POSITION;
+        this.recordsPerPartition[i] = 0;
+      }
+      size = size - (size % INT_SIZE);
+      this.size = size;
+      this.buffer = new byte[size];
+      this.metaBuffer = ByteBuffer.wrap(buffer).order(ByteOrder.nativeOrder()).asIntBuffer();
+      availableSize = size;
+    }
+
+    void reset() {
+      for (int i = 0; i < numPartitions; i++) {
+        this.partitionPositions[i] = PARTITION_ABSENT_POSITION;
+        this.recordsPerPartition[i] = 0;
+      }
+      numRecords = 0;
+      nextPosition = 0;
+      skipSize = 0;
+      availableSize = size;
+    }
+
+    void cleanup() {
+      buffer = null;
+      metaBuffer = null;
+    }
+  }
+
+  private static class BufferTooSmallException extends IOException {
+    private static final long serialVersionUID = 1L;
+  }
+
+  private class SpillCallback implements FutureCallback<SpillResult> {
+
+    private final int spillNumber;
+
+    SpillCallback(int spillNumber) {
+      this.spillNumber = spillNumber;
+    }
+
+    @Override
+    public void onSuccess(SpillResult result) {
+      LOG.info("Spill# " + spillNumber + " complete.");
+      spilledSize += result.spillSize;
+      try {
+        result.wrappedBuffer.reset();
+        availableBuffers.add(result.wrappedBuffer);
+
+      } catch (Throwable e) {
+        LOG.fatal("Failure while attempting to reset buffer after spill", e);
+        outputContext.fatalError(e, "Failure while attempting to reset buffer after spill");
+      }
+
+      spillLock.lock();
+      try {
+        if (pendingSpillCount.decrementAndGet() == 0) {
+          spillInProgress.signal();
+        }
+      } finally {
+        spillLock.unlock();
+      }
+    }
+
+    @Override
+    public void onFailure(Throwable t) {
+      // spillException setup to throw an exception back to the user. Requires synchronization.
+      // Consider removing it in favor of having Tez kill the task
+      LOG.fatal("Failure while spilling to disk", t);
+      spillException = t;
+      outputContext.fatalError(t, "Failure while spilling to disk");
+      spillLock.lock();
+      try {
+        spillInProgress.signal();
+      } finally {
+        spillLock.unlock();
+      }
+    }
+  }
+
+  private static class SpillResult {
+    final long spillSize;
+    final WrappedBuffer wrappedBuffer;
+
+    SpillResult(long size, WrappedBuffer wrappedBuffer) {
+      this.spillSize = size;
+      this.wrappedBuffer = wrappedBuffer;
+    }
+  }
+
+  private static class SpillInfo {
+    final TezSpillRecord spillRecord;
+    final Path outPath;
+
+    SpillInfo(TezSpillRecord spillRecord, Path outPath) {
+      this.spillRecord = spillRecord;
+      this.outPath = outPath;
+    }
+  }
+
+  @VisibleForTesting
+  String getHost() {
+    return System.getenv(ApplicationConstants.Environment.NM_HOST.toString());
+  }
+
+  @VisibleForTesting
+  int getShufflePort() throws IOException {
+    ByteBuffer shuffleMetadata = outputContext
+        .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
+    int shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetadata);
+    return shufflePort;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/26ad7037/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedPartitionedKVOutput.java
index ff2af32..b800039 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedPartitionedKVOutput.java
@@ -18,18 +18,88 @@
 
 package org.apache.tez.runtime.library.output;
 
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.TezOutputContext;
+import org.apache.tez.runtime.api.Writer;
+import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
+import org.apache.tez.runtime.library.common.writers.UnorderedPartitionedKVWriter;
+
+import com.google.common.base.Preconditions;
 
 /**
- * <code>OnFileUnorderedPartitionedKVOutput</code> is a {@link LogicalOutput}
- * which can be used to write Key-Value pairs. The key-value pairs are written
- * to the correct partition based on the configured Partitioner.
+ * <code>OnFileUnorderedPartitionedKVOutput</code> is a {@link LogicalOutput} which can be used to
+ * write Key-Value pairs. The key-value pairs are written to the correct partition based on the
+ * configured Partitioner.
  * 
  * This currently acts as a usable placeholder for writing unordered output (the data is sorted,
  * which should be functionally correct since there's no guarantees on order without a sort).
  * TEZ-661 to add a proper implementation.
  * 
  */
-public class OnFileUnorderedPartitionedKVOutput extends OnFileSortedOutput {
+public class OnFileUnorderedPartitionedKVOutput implements LogicalOutput {
+
+  private static final Log LOG = LogFactory.getLog(OnFileUnorderedPartitionedKVOutput.class);
+
+  private TezOutputContext outputContext;
+  private Configuration conf;
+  private int numPhysicalOutputs;
+  private MemoryUpdateCallbackHandler memoryUpdateCallbackHandler;
+  private UnorderedPartitionedKVWriter kvWriter;
+  private final AtomicBoolean isStarted = new AtomicBoolean(false);
+
+  @Override
+  public synchronized List<Event> initialize(TezOutputContext outputContext) throws Exception {
+    this.outputContext = outputContext;
+    this.conf = TezUtils.createConfFromUserPayload(outputContext.getUserPayload());
+    this.conf.setStrings(TezJobConfig.LOCAL_DIRS, outputContext.getWorkDirs());
+    this.memoryUpdateCallbackHandler = new MemoryUpdateCallbackHandler();
+    outputContext.requestInitialMemory(
+        UnorderedPartitionedKVWriter.getInitialMemoryRequirement(conf,
+            outputContext.getTotalMemoryAvailableToTask()), memoryUpdateCallbackHandler);
+    return Collections.emptyList();
+  }
+
+  @Override
+  public synchronized void start() throws Exception {
+    if (!isStarted.get()) {
+      memoryUpdateCallbackHandler.validateUpdateReceived();
+      this.kvWriter = new UnorderedPartitionedKVWriter(outputContext, conf, numPhysicalOutputs,
+          memoryUpdateCallbackHandler.getMemoryAssigned());
+      isStarted.set(true);
+    }
+  }
+
+  @Override
+  public synchronized Writer getWriter() throws Exception {
+    Preconditions.checkState(isStarted.get(), "Cannot get writer before starting the Output");
+    return kvWriter;
+  }
+
+  @Override
+  public void handleEvents(List<Event> outputEvents) {
+  }
+
+  @Override
+  public synchronized List<Event> close() throws Exception {
+    if (isStarted.get()) {
+      return kvWriter.close();
+    } else {
+      return Collections.emptyList();
+    }
+  }
 
+  @Override
+  public synchronized void setNumPhysicalOutputs(int numOutputs) {
+    this.numPhysicalOutputs = numOutputs;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/26ad7037/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/partitioner/HashPartitioner.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/partitioner/HashPartitioner.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/partitioner/HashPartitioner.java
new file mode 100644
index 0000000..d95a654
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/partitioner/HashPartitioner.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.partitioner;
+
+import org.apache.tez.runtime.library.api.Partitioner;
+
+public class HashPartitioner implements Partitioner {
+
+  @Override
+  public int getPartition(Object key, Object value, int numPartitions) {
+    return (key.hashCode() & Integer.MAX_VALUE) % numPartitions;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/26ad7037/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java
index c295601..f7dfe0a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java
@@ -35,6 +35,7 @@ import org.apache.tez.runtime.library.input.ShuffledMergedInput;
 import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
 import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
 import org.apache.tez.runtime.library.output.OnFileSortedOutput;
+import org.apache.tez.runtime.library.output.OnFileUnorderedPartitionedKVOutput;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -185,6 +186,8 @@ public class WeightedScalingMemoryDistributor implements InitialMemoryAllocator
       requestType = RequestType.SORTED_MERGED_INPUT;
     } else if (className.equals(ShuffledUnorderedKVInput.class.getName())) {
       requestType = RequestType.UNSORTED_INPUT;
+    } else if (className.equals(OnFileUnorderedPartitionedKVOutput.class.getName())) {
+      requestType = RequestType.PARTITIONED_UNSORTED_OUTPUT;
     } else {
       requestType = RequestType.OTHER;
       LOG.info("Falling back to RequestType.OTHER for class: " + className);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/26ad7037/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
new file mode 100644
index 0000000..8b51765
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
@@ -0,0 +1,673 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.common.writers;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TezOutputContext;
+import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
+import org.apache.tez.runtime.library.api.Partitioner;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
+import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
+import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
+import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
+import org.apache.tez.runtime.library.partitioner.HashPartitioner;
+import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+
+@RunWith(value = Parameterized.class)
+public class TestUnorderedPartitionedKVWriter {
+
+  private static final Log LOG = LogFactory.getLog(TestUnorderedPartitionedKVWriter.class);
+
+  private static final String HOST_STRING = "localhost";
+  private static final int SHUFFLE_PORT = 4000;
+
+  private static String testTmpDir = System.getProperty("test.build.data", "/tmp");
+  private static final Path TEST_ROOT_DIR = new Path(testTmpDir,
+      TestUnorderedPartitionedKVWriter.class.getSimpleName());
+  private static FileSystem localFs;
+
+  private boolean shouldCompress;
+
+  public TestUnorderedPartitionedKVWriter(boolean shouldCompress) {
+    this.shouldCompress = shouldCompress;
+  }
+
+  @Parameters
+  public static Collection<Object[]> data() {
+    Object[][] data = new Object[][] { { false }, { true } };
+    return Arrays.asList(data);
+  }
+
+  @Before
+  public void setup() throws IOException {
+    LOG.info("Setup. Using test dir: " + TEST_ROOT_DIR);
+    localFs = FileSystem.getLocal(new Configuration());
+    localFs.delete(TEST_ROOT_DIR, true);
+    localFs.mkdirs(TEST_ROOT_DIR);
+  }
+
+  @After
+  public void cleanup() throws IOException {
+    LOG.info("CleanUp");
+    localFs.delete(TEST_ROOT_DIR, true);
+  }
+
+  @Test(timeout = 10000)
+  public void testBufferSizing() throws IOException {
+    ApplicationId appId = ApplicationId.newInstance(10000, 1);
+    TezCounters counters = new TezCounters();
+    String uniqueId = UUID.randomUUID().toString();
+    TezOutputContext outputContext = createMockOutputContext(counters, appId, uniqueId);
+
+    int maxSingleBufferSizeBytes = 2047;
+    Configuration conf = createConfiguration(outputContext, IntWritable.class, LongWritable.class,
+        false, maxSingleBufferSizeBytes);
+
+    int numOutputs = 10;
+
+    UnorderedPartitionedKVWriter kvWriter = null;
+
+    kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext, conf, numOutputs, 2048);
+    assertEquals(2, kvWriter.numBuffers);
+    assertEquals(1024, kvWriter.sizePerBuffer);
+    assertEquals(1, kvWriter.numInitializedBuffers);
+
+    kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext, conf, numOutputs,
+        maxSingleBufferSizeBytes * 3);
+    assertEquals(3, kvWriter.numBuffers);
+    assertEquals(maxSingleBufferSizeBytes - maxSingleBufferSizeBytes % 4, kvWriter.sizePerBuffer);
+    assertEquals(1, kvWriter.numInitializedBuffers);
+
+    kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext, conf, numOutputs,
+        maxSingleBufferSizeBytes * 2 + 1);
+    assertEquals(3, kvWriter.numBuffers);
+    assertEquals(1364, kvWriter.sizePerBuffer);
+    assertEquals(1, kvWriter.numInitializedBuffers);
+
+    kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext, conf, numOutputs, 10240);
+    assertEquals(6, kvWriter.numBuffers);
+    assertEquals(1704, kvWriter.sizePerBuffer);
+    assertEquals(1, kvWriter.numInitializedBuffers);
+  }
+
+  @Test(timeout = 10000)
+  public void testNoSpill() throws IOException, InterruptedException {
+    baseTest(10, 10, null, shouldCompress);
+  }
+
+  @Test(timeout = 10000)
+  public void testSingleSpill() throws IOException, InterruptedException {
+    baseTest(50, 10, null, shouldCompress);
+  }
+
+  @Test(timeout = 10000)
+  public void testMultipleSpills() throws IOException, InterruptedException {
+    baseTest(200, 10, null, shouldCompress);
+  }
+
+  @Test(timeout = 10000)
+  public void testNoRecords() throws IOException, InterruptedException {
+    baseTest(0, 10, null, shouldCompress);
+  }
+
+  @Test(timeout = 10000)
+  public void testSkippedPartitions() throws IOException, InterruptedException {
+    baseTest(200, 10, Sets.newHashSet(2, 5), shouldCompress);
+  }
+
+  @Test(timeout = 10000)
+  public void testRandomText() throws IOException, InterruptedException {
+    textTest(100, 10, 2048, 0, 0, 0);
+  }
+
+  @Test(timeout = 10000)
+  public void testLargeKeys() throws IOException, InterruptedException {
+    textTest(0, 10, 2048, 10, 0, 0);
+  }
+
+  @Test(timeout = 10000)
+  public void testLargevalues() throws IOException, InterruptedException {
+    textTest(0, 10, 2048, 0, 10, 0);
+  }
+
+  @Test(timeout = 10000)
+  public void testLargeKvPairs() throws IOException, InterruptedException {
+    textTest(0, 10, 2048, 0, 0, 10);
+  }
+
+  @Test(timeout = 10000)
+  public void testTextMixedRecords() throws IOException, InterruptedException {
+    textTest(100, 10, 2048, 10, 10, 10);
+  }
+
+  public void textTest(int numRegularRecords, int numPartitions, long availableMemory,
+      int numLargeKeys, int numLargevalues, int numLargeKvPairs) throws IOException,
+      InterruptedException {
+    Partitioner partitioner = new HashPartitioner();
+    ApplicationId appId = ApplicationId.newInstance(10000, 1);
+    TezCounters counters = new TezCounters();
+    String uniqueId = UUID.randomUUID().toString();
+    TezOutputContext outputContext = createMockOutputContext(counters, appId, uniqueId);
+    Random random = new Random();
+
+    Configuration conf = createConfiguration(outputContext, Text.class, Text.class, shouldCompress,
+        -1, HashPartitioner.class);
+    CompressionCodec codec = null;
+    if (shouldCompress) {
+      codec = new DefaultCodec();
+      ((Configurable) codec).setConf(conf);
+    }
+
+    int numRecordsWritten = 0;
+
+    Map<Integer, Multimap<String, String>> expectedValues = new HashMap<Integer, Multimap<String, String>>();
+    for (int i = 0; i < numPartitions; i++) {
+      expectedValues.put(i, LinkedListMultimap.<String, String> create());
+    }
+
+    UnorderedPartitionedKVWriter kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext,
+        conf, numPartitions, availableMemory);
+
+    int sizePerBuffer = kvWriter.sizePerBuffer;
+
+    BitSet partitionsWithData = new BitSet(numPartitions);
+    Text keyText = new Text();
+    Text valText = new Text();
+    for (int i = 0; i < numRegularRecords; i++) {
+      String key = createRandomString(Math.abs(random.nextInt(10)));
+      String val = createRandomString(Math.abs(random.nextInt(20)));
+      keyText.set(key);
+      valText.set(val);
+      int partition = partitioner.getPartition(keyText, valText, numPartitions);
+      partitionsWithData.set(partition);
+      expectedValues.get(partition).put(key, val);
+      kvWriter.write(keyText, valText);
+      numRecordsWritten++;
+    }
+
+    // Write Large key records
+    for (int i = 0; i < numLargeKeys; i++) {
+      String key = createRandomString(sizePerBuffer + Math.abs(random.nextInt(100)));
+      String val = createRandomString(Math.abs(random.nextInt(20)));
+      keyText.set(key);
+      valText.set(val);
+      int partition = partitioner.getPartition(keyText, valText, numPartitions);
+      partitionsWithData.set(partition);
+      expectedValues.get(partition).put(key, val);
+      kvWriter.write(keyText, valText);
+      numRecordsWritten++;
+    }
+
+    // Write Large val records
+    for (int i = 0; i < numLargevalues; i++) {
+      String key = createRandomString(Math.abs(random.nextInt(10)));
+      String val = createRandomString(sizePerBuffer + Math.abs(random.nextInt(100)));
+      keyText.set(key);
+      valText.set(val);
+      int partition = partitioner.getPartition(keyText, valText, numPartitions);
+      partitionsWithData.set(partition);
+      expectedValues.get(partition).put(key, val);
+      kvWriter.write(keyText, valText);
+      numRecordsWritten++;
+    }
+
+    // Write records where key + val are large (but both can fit in the buffer individually)
+    for (int i = 0; i < numLargeKvPairs; i++) {
+      String key = createRandomString(sizePerBuffer / 2 + Math.abs(random.nextInt(100)));
+      String val = createRandomString(sizePerBuffer / 2 + Math.abs(random.nextInt(100)));
+      keyText.set(key);
+      valText.set(val);
+      int partition = partitioner.getPartition(keyText, valText, numPartitions);
+      partitionsWithData.set(partition);
+      expectedValues.get(partition).put(key, val);
+      kvWriter.write(keyText, valText);
+      numRecordsWritten++;
+    }
+
+    List<Event> events = kvWriter.close();
+    verify(outputContext, never()).fatalError(any(Throwable.class), any(String.class));
+
+    TezCounter outputLargeRecordsCounter = counters.findCounter(TaskCounter.OUTPUT_LARGE_RECORDS);
+    assertEquals(numLargeKeys + numLargevalues + numLargeKvPairs,
+        outputLargeRecordsCounter.getValue());
+
+    // Validate the event
+    assertEquals(1, events.size());
+    assertTrue(events.get(0) instanceof CompositeDataMovementEvent);
+    CompositeDataMovementEvent cdme = (CompositeDataMovementEvent) events.get(0);
+    assertEquals(0, cdme.getSourceIndexStart());
+    assertEquals(numPartitions, cdme.getSourceIndexEnd());
+    DataMovementEventPayloadProto eventProto = DataMovementEventPayloadProto.parseFrom(cdme
+        .getUserPayload());
+    assertFalse(eventProto.hasData());
+    BitSet emptyPartitionBits = null;
+    if (partitionsWithData.cardinality() != numPartitions) {
+      assertTrue(eventProto.hasEmptyPartitions());
+      byte[] emptyPartitions = TezUtils.decompressByteStringToByteArray(eventProto
+          .getEmptyPartitions());
+      emptyPartitionBits = TezUtils.fromByteArray(emptyPartitions);
+      assertEquals(numPartitions - partitionsWithData.cardinality(),
+          emptyPartitionBits.cardinality());
+    } else {
+      assertFalse(eventProto.hasEmptyPartitions());
+      emptyPartitionBits = new BitSet(numPartitions);
+    }
+    assertEquals(HOST_STRING, eventProto.getHost());
+    assertEquals(SHUFFLE_PORT, eventProto.getPort());
+    assertEquals(uniqueId, eventProto.getPathComponent());
+
+    // Verify the data
+    // Verify the actual data
+    TezTaskOutput taskOutput = new TezTaskOutputFiles(conf, uniqueId);
+    Path outputFilePath = null;
+    Path spillFilePath = null;
+    try {
+      outputFilePath = taskOutput.getOutputFile();
+    } catch (DiskErrorException e) {
+      if (numRecordsWritten > 0) {
+        fail();
+      } else {
+        // Record checking not required.
+        return;
+      }
+    }
+    try {
+      spillFilePath = taskOutput.getOutputIndexFile();
+    } catch (DiskErrorException e) {
+      if (numRecordsWritten > 0) {
+        fail();
+      } else {
+        // Record checking not required.
+        return;
+      }
+    }
+
+    // Special case for 0 records.
+    TezSpillRecord spillRecord = new TezSpillRecord(spillFilePath, conf);
+    DataInputBuffer keyBuffer = new DataInputBuffer();
+    DataInputBuffer valBuffer = new DataInputBuffer();
+    Text keyDeser = new Text();
+    Text valDeser = new Text();
+    for (int i = 0; i < numPartitions; i++) {
+      if (emptyPartitionBits.get(i)) {
+        continue;
+      }
+      TezIndexRecord indexRecord = spillRecord.getIndex(i);
+      FSDataInputStream inStream = FileSystem.getLocal(conf).open(outputFilePath);
+      inStream.seek(indexRecord.getStartOffset());
+      IFile.Reader reader = new IFile.Reader(inStream, indexRecord.getPartLength(), codec, null,
+          null, false, 0, -1);
+      while (reader.nextRawKey(keyBuffer)) {
+        reader.nextRawValue(valBuffer);
+        keyDeser.readFields(keyBuffer);
+        valDeser.readFields(valBuffer);
+        int partition = partitioner.getPartition(keyDeser, valDeser, numPartitions);
+        assertTrue(expectedValues.get(partition).remove(keyDeser.toString(), valDeser.toString()));
+      }
+      inStream.close();
+    }
+    for (int i = 0; i < numPartitions; i++) {
+      assertEquals(0, expectedValues.get(i).size());
+      expectedValues.remove(i);
+    }
+    assertEquals(0, expectedValues.size());
+  }
+
+  private void baseTest(int numRecords, int numPartitions, Set<Integer> skippedPartitions,
+      boolean shouldCompress) throws IOException, InterruptedException {
+    PartitionerForTest partitioner = new PartitionerForTest();
+    ApplicationId appId = ApplicationId.newInstance(10000, 1);
+    TezCounters counters = new TezCounters();
+    String uniqueId = UUID.randomUUID().toString();
+    TezOutputContext outputContext = createMockOutputContext(counters, appId, uniqueId);
+
+    Configuration conf = createConfiguration(outputContext, IntWritable.class, LongWritable.class,
+        shouldCompress, -1);
+    CompressionCodec codec = null;
+    if (shouldCompress) {
+      codec = new DefaultCodec();
+      ((Configurable) codec).setConf(conf);
+    }
+
+    int numOutputs = numPartitions;
+    long availableMemory = 2048;
+    int numRecordsWritten = 0;
+
+    Map<Integer, Multimap<Integer, Long>> expectedValues = new HashMap<Integer, Multimap<Integer, Long>>();
+    for (int i = 0; i < numOutputs; i++) {
+      expectedValues.put(i, LinkedListMultimap.<Integer, Long> create());
+    }
+
+    UnorderedPartitionedKVWriter kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext,
+        conf, numOutputs, availableMemory);
+
+    int sizePerBuffer = kvWriter.sizePerBuffer;
+    int sizePerRecord = 4 + 8; // IntW + LongW
+    int sizePerRecordWithOverhead = sizePerRecord + 12; // Record + META_OVERHEAD
+
+    IntWritable intWritable = new IntWritable();
+    LongWritable longWritable = new LongWritable();
+    for (int i = 0; i < numRecords; i++) {
+      intWritable.set(i);
+      longWritable.set(i);
+      int partition = partitioner.getPartition(intWritable, longWritable, numOutputs);
+      if (skippedPartitions != null && skippedPartitions.contains(partition)) {
+        continue;
+      }
+      expectedValues.get(partition).put(intWritable.get(), longWritable.get());
+      kvWriter.write(intWritable, longWritable);
+      numRecordsWritten++;
+    }
+    List<Event> events = kvWriter.close();
+
+    int recordsPerBuffer = sizePerBuffer / sizePerRecordWithOverhead;
+    int numExpectedSpills = numRecordsWritten / recordsPerBuffer;
+
+    verify(outputContext, never()).fatalError(any(Throwable.class), any(String.class));
+
+    // Verify the status of the buffers
+    if (numExpectedSpills == 0) {
+      assertEquals(1, kvWriter.numInitializedBuffers);
+    } else {
+      assertTrue(kvWriter.numInitializedBuffers > 1);
+    }
+    assertNull(kvWriter.currentBuffer);
+    assertEquals(0, kvWriter.availableBuffers.size());
+
+    // Verify the counters
+    TezCounter outputRecordBytesCounter = counters.findCounter(TaskCounter.OUTPUT_BYTES);
+    TezCounter outputRecordsCounter = counters.findCounter(TaskCounter.OUTPUT_RECORDS);
+    TezCounter outputBytesWithOverheadCounter = counters
+        .findCounter(TaskCounter.OUTPUT_BYTES_WITH_OVERHEAD);
+    TezCounter fileOutputBytesCounter = counters.findCounter(TaskCounter.OUTPUT_BYTES_PHYSICAL);
+    TezCounter spilledRecordsCounter = counters.findCounter(TaskCounter.SPILLED_RECORDS);
+    TezCounter additionalSpillBytesWritternCounter = counters
+        .findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN);
+    TezCounter additionalSpillBytesReadCounter = counters
+        .findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ);
+    TezCounter numAdditionalSpillsCounter = counters
+        .findCounter(TaskCounter.ADDITIONAL_SPILL_COUNT);
+    assertEquals(numRecordsWritten * sizePerRecord, outputRecordBytesCounter.getValue());
+    assertEquals(numRecordsWritten, outputRecordsCounter.getValue());
+    assertEquals(numRecordsWritten * sizePerRecordWithOverhead,
+        outputBytesWithOverheadCounter.getValue());
+    long fileOutputBytes = fileOutputBytesCounter.getValue();
+    if (numRecordsWritten > 0) {
+      assertTrue(fileOutputBytes > 0);
+      if (!shouldCompress) {
+        assertTrue(fileOutputBytes > outputRecordBytesCounter.getValue());
+      }
+    } else {
+      assertEquals(0, fileOutputBytes);
+    }
+    assertEquals(recordsPerBuffer * numExpectedSpills, spilledRecordsCounter.getValue());
+    long additionalSpillBytesWritten = additionalSpillBytesWritternCounter.getValue();
+    long additionalSpillBytesRead = additionalSpillBytesReadCounter.getValue();
+    if (numExpectedSpills == 0) {
+      assertEquals(0, additionalSpillBytesWritten);
+      assertEquals(0, additionalSpillBytesRead);
+    } else {
+      assertTrue(additionalSpillBytesWritten > 0);
+      assertTrue(additionalSpillBytesRead > 0);
+      if (!shouldCompress) {
+        assertTrue(additionalSpillBytesWritten > (recordsPerBuffer * numExpectedSpills * sizePerRecord));
+        assertTrue(additionalSpillBytesRead > (recordsPerBuffer * numExpectedSpills * sizePerRecord));
+      }
+    }
+    assertTrue(additionalSpillBytesWritten == additionalSpillBytesRead);
+    assertEquals(numExpectedSpills, numAdditionalSpillsCounter.getValue());
+
+    BitSet emptyPartitionBits = null;
+    // Verify the event returned
+    assertEquals(1, events.size());
+    assertTrue(events.get(0) instanceof CompositeDataMovementEvent);
+    CompositeDataMovementEvent cdme = (CompositeDataMovementEvent) events.get(0);
+    assertEquals(0, cdme.getSourceIndexStart());
+    assertEquals(numOutputs, cdme.getSourceIndexEnd());
+    DataMovementEventPayloadProto eventProto = DataMovementEventPayloadProto.parseFrom(cdme
+        .getUserPayload());
+    assertFalse(eventProto.hasData());
+    if (skippedPartitions == null && numRecordsWritten > 0) {
+      assertFalse(eventProto.hasEmptyPartitions());
+      emptyPartitionBits = new BitSet(numPartitions);
+    } else {
+      assertTrue(eventProto.hasEmptyPartitions());
+      byte[] emptyPartitions = TezUtils.decompressByteStringToByteArray(eventProto
+          .getEmptyPartitions());
+      emptyPartitionBits = TezUtils.fromByteArray(emptyPartitions);
+      if (numRecordsWritten == 0) {
+        assertEquals(numPartitions, emptyPartitionBits.cardinality());
+      } else {
+        for (Integer e : skippedPartitions) {
+          assertTrue(emptyPartitionBits.get(e));
+        }
+        assertEquals(skippedPartitions.size(), emptyPartitionBits.cardinality());
+      }
+    }
+    if (emptyPartitionBits.cardinality() != numPartitions) {
+      assertEquals(HOST_STRING, eventProto.getHost());
+      assertEquals(SHUFFLE_PORT, eventProto.getPort());
+      assertEquals(uniqueId, eventProto.getPathComponent());
+    } else {
+      assertFalse(eventProto.hasHost());
+      assertFalse(eventProto.hasPort());
+      assertFalse(eventProto.hasPathComponent());
+    }
+
+    // Verify the actual data
+    TezTaskOutput taskOutput = new TezTaskOutputFiles(conf, uniqueId);
+    Path outputFilePath = null;
+    Path spillFilePath = null;
+    try {
+      outputFilePath = taskOutput.getOutputFile();
+    } catch (DiskErrorException e) {
+      if (numRecordsWritten > 0) {
+        fail();
+      } else {
+        // Record checking not required.
+        return;
+      }
+    }
+    try {
+      spillFilePath = taskOutput.getOutputIndexFile();
+    } catch (DiskErrorException e) {
+      if (numRecordsWritten > 0) {
+        fail();
+      } else {
+        // Record checking not required.
+        return;
+      }
+    }
+
+    // Special case for 0 records.
+    TezSpillRecord spillRecord = new TezSpillRecord(spillFilePath, conf);
+    DataInputBuffer keyBuffer = new DataInputBuffer();
+    DataInputBuffer valBuffer = new DataInputBuffer();
+    IntWritable keyDeser = new IntWritable();
+    LongWritable valDeser = new LongWritable();
+    for (int i = 0; i < numOutputs; i++) {
+      if (skippedPartitions != null && skippedPartitions.contains(i)) {
+        continue;
+      }
+      TezIndexRecord indexRecord = spillRecord.getIndex(i);
+      FSDataInputStream inStream = FileSystem.getLocal(conf).open(outputFilePath);
+      inStream.seek(indexRecord.getStartOffset());
+      IFile.Reader reader = new IFile.Reader(inStream, indexRecord.getPartLength(), codec, null,
+          null, false, 0, -1);
+      while (reader.nextRawKey(keyBuffer)) {
+        reader.nextRawValue(valBuffer);
+        keyDeser.readFields(keyBuffer);
+        valDeser.readFields(valBuffer);
+        int partition = partitioner.getPartition(keyDeser, valDeser, numOutputs);
+        assertTrue(expectedValues.get(partition).remove(keyDeser.get(), valDeser.get()));
+      }
+      inStream.close();
+    }
+    for (int i = 0; i < numOutputs; i++) {
+      assertEquals(0, expectedValues.get(i).size());
+      expectedValues.remove(i);
+    }
+    assertEquals(0, expectedValues.size());
+  }
+
+  private static String createRandomString(int size) {
+    StringBuilder sb = new StringBuilder();
+    Random random = new Random();
+    for (int i = 0; i < size; i++) {
+      int r = Math.abs(random.nextInt()) % 26;
+      sb.append((char) (65 + r));
+    }
+    return sb.toString();
+  }
+
+  private TezOutputContext createMockOutputContext(TezCounters counters, ApplicationId appId,
+      String uniqueId) {
+    TezOutputContext outputContext = mock(TezOutputContext.class);
+    doReturn(counters).when(outputContext).getCounters();
+    doReturn(appId).when(outputContext).getApplicationId();
+    doReturn(1).when(outputContext).getDAGAttemptNumber();
+    doReturn("dagName").when(outputContext).getDAGName();
+    doReturn("destinationVertexName").when(outputContext).getDestinationVertexName();
+    doReturn(1).when(outputContext).getOutputIndex();
+    doReturn(1).when(outputContext).getTaskAttemptNumber();
+    doReturn(1).when(outputContext).getTaskIndex();
+    doReturn(1).when(outputContext).getTaskVertexIndex();
+    doReturn("vertexName").when(outputContext).getTaskVertexName();
+    doReturn(uniqueId).when(outputContext).getUniqueIdentifier();
+    ByteBuffer portBuffer = ByteBuffer.allocate(4);
+    portBuffer.mark();
+    portBuffer.putInt(SHUFFLE_PORT);
+    portBuffer.reset();
+    doReturn(portBuffer).when(outputContext).getServiceProviderMetaData(
+        eq(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID));
+    Path outDirBase = new Path(TEST_ROOT_DIR, "outDir_" + uniqueId);
+    String[] outDirs = new String[] { outDirBase.toString() };
+    doReturn(outDirs).when(outputContext).getWorkDirs();
+    return outputContext;
+  }
+
+  private Configuration createConfiguration(TezOutputContext outputContext,
+      Class<? extends Writable> keyClass, Class<? extends Writable> valClass,
+      boolean shouldCompress, int maxSingleBufferSizeBytes) {
+    return createConfiguration(outputContext, keyClass, valClass, shouldCompress,
+        maxSingleBufferSizeBytes, PartitionerForTest.class);
+  }
+
+  private Configuration createConfiguration(TezOutputContext outputContext,
+      Class<? extends Writable> keyClass, Class<? extends Writable> valClass,
+      boolean shouldCompress, int maxSingleBufferSizeBytes,
+      Class<? extends Partitioner> partitionerClass) {
+    Configuration conf = new Configuration(false);
+    conf.setStrings(TezJobConfig.LOCAL_DIRS, outputContext.getWorkDirs());
+    conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS, keyClass.getName());
+    conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS, valClass.getName());
+    conf.set(TezJobConfig.TEZ_RUNTIME_PARTITIONER_CLASS, partitionerClass.getName());
+    if (maxSingleBufferSizeBytes >= 0) {
+      conf.setInt(TezJobConfig.TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES,
+          maxSingleBufferSizeBytes);
+    }
+    conf.setBoolean(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS, shouldCompress);
+    if (shouldCompress) {
+      conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC,
+          DefaultCodec.class.getName());
+    }
+    return conf;
+  }
+
+  public static class PartitionerForTest implements Partitioner {
+    @Override
+    public int getPartition(Object key, Object value, int numPartitions) {
+      if (key instanceof IntWritable) {
+        return ((IntWritable) key).get() % numPartitions;
+      } else {
+        throw new UnsupportedOperationException(
+            "Test partitioner expected to be called with IntWritable only");
+      }
+    }
+  }
+
+  private static class UnorderedPartitionedKVWriterForTest extends UnorderedPartitionedKVWriter {
+
+    public UnorderedPartitionedKVWriterForTest(TezOutputContext outputContext, Configuration conf,
+        int numOutputs, long availableMemoryBytes) throws IOException {
+      super(outputContext, conf, numOutputs, availableMemoryBytes);
+    }
+
+    @Override
+    String getHost() {
+      return HOST_STRING;
+    }
+  }
+}