You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2015/10/28 21:35:08 UTC

tez git commit: TEZ-2244. PipelinedSorter: Progressive allocation for sort-buffers (rbalamohan)

Repository: tez
Updated Branches:
  refs/heads/master 70096c169 -> 149a04c28


TEZ-2244. PipelinedSorter: Progressive allocation for sort-buffers (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/149a04c2
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/149a04c2
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/149a04c2

Branch: refs/heads/master
Commit: 149a04c28d53988fd49510c5453fc19ac0d1030e
Parents: 70096c1
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Wed Oct 28 13:35:36 2015 -0700
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Wed Oct 28 13:35:36 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../library/api/TezRuntimeConfiguration.java    |  28 ++
 .../common/sort/impl/PipelinedSorter.java       | 187 +++++++----
 .../output/OrderedPartitionedKVOutput.java      |   2 +
 .../common/sort/impl/TestPipelinedSorter.java   | 328 ++++++++++++++++---
 .../TestOrderedPartitionedKVEdgeConfig.java     |   9 +
 6 files changed, 457 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/149a04c2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4f58119..14e2bbe 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.8.2: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2244. PipelinedSorter: Progressive allocation for sort-buffers
   TEZ-2904. Pig can't specify task specific command opts
   TEZ-2888. Make critical path calculation resilient to AM crash
   TEZ-2899. Tez UI: DAG getting created with huge horizontal gap in between vertices

http://git-wip-us.apache.org/repos/asf/tez/blob/149a04c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
index cf05546..caad6ef 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
@@ -128,6 +128,31 @@ public class TezRuntimeConfiguration {
       "combine.min.spills";
   public static final int TEZ_RUNTIME_COMBINE_MIN_SPILLS_DEFAULT = 3;
 
+  /**
+   * Tries to allocate @link{#TEZ_RUNTIME_IO_SORT_MB} in chunks specified in
+   * this parameter.
+   */
+  @ConfigurationProperty(type = "integer")
+  public static final String
+      TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB = TEZ_RUNTIME_PREFIX +
+      "pipelined.sorter.min-block.size.in.mb";
+  public static final int
+      TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB_DEFAULT = 2000;
+
+  /**
+   * Setting this to true would enable sorter
+   * to auto-allocate memory on need basis in progressive fashion.
+   *
+   * Setting to false would allocate all available memory during
+   * initialization of sorter. In such cases,@link{#TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB}
+   * would be honored and memory specified in @link{#TEZ_RUNTIME_IO_SORT_MB}
+   * would be initialized upfront.
+   */
+  @ConfigurationProperty(type = "boolean")
+  public static final String TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY = TEZ_RUNTIME_PREFIX +
+      "pipelined.sorter.lazy-allocate.memory";
+  public static final boolean
+      TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY_DEFAULT = false;
 
   /**
    * String value.
@@ -498,6 +523,9 @@ public class TezRuntimeConfiguration {
     tezRuntimeKeys.add(TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES);
     tezRuntimeKeys.add(TEZ_RUNTIME_COMBINE_MIN_SPILLS);
     tezRuntimeKeys.add(TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS);
+    tezRuntimeKeys.add(
+        TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB);
+    tezRuntimeKeys.add(TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY);
     tezRuntimeKeys.add(TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB);
     tezRuntimeKeys.add(TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES);
     tezRuntimeKeys.add(TEZ_RUNTIME_PARTITIONER_CLASS);

http://git-wip-us.apache.org/repos/asf/tez/blob/149a04c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index 81f5211..f512a5d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -25,9 +25,7 @@ import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.nio.IntBuffer;
 import java.util.ArrayList;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.ListIterator;
 import java.util.PriorityQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -87,10 +85,6 @@ public class PipelinedSorter extends ExternalSorter {
   private final ProxyComparator hasher;
   // SortSpans  
   private SortSpan span;
-  //Maintain a bunch of ByteBuffers (each of them can hold approximately 2 GB data)
-  @VisibleForTesting
-  protected final LinkedList<ByteBuffer> bufferList = new LinkedList<ByteBuffer>();
-  private ListIterator<ByteBuffer> listIterator;
 
   //total memory capacity allocated to sorter
   private final long capacity;
@@ -98,9 +92,6 @@ public class PipelinedSorter extends ExternalSorter {
   //track buffer overflow recursively in all buffers
   private int bufferOverflowRecursion;
 
-  private final int blockSize;
-
-
   // Merger
   private final SpanMerger merger; 
   private final ExecutorService sortmaster;
@@ -110,17 +101,44 @@ public class PipelinedSorter extends ExternalSorter {
 
   private final boolean pipelinedShuffle;
 
+  private long currentAllocatableMemory;
+  //Maintain a list of ByteBuffers
+  @VisibleForTesting
+  final List<ByteBuffer> buffers;
+  final int maxNumberOfBlocks;
+  private int bufferIndex = -1;
+  private final int MIN_BLOCK_SIZE;
+  private final boolean lazyAllocateMem;
+
   // TODO Set additional countesr - total bytes written, spills etc.
 
   public PipelinedSorter(OutputContext outputContext, Configuration conf, int numOutputs,
       long initialMemoryAvailable) throws IOException {
-    this(outputContext,conf,numOutputs, initialMemoryAvailable, 0);
-  }
-
-  PipelinedSorter(OutputContext outputContext, Configuration conf, int numOutputs,
-      long initialMemoryAvailable, int blkSize) throws IOException {
     super(outputContext, conf, numOutputs, initialMemoryAvailable);
 
+    lazyAllocateMem = this.conf.getBoolean(TezRuntimeConfiguration
+        .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, TezRuntimeConfiguration
+        .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY_DEFAULT);
+
+    if (lazyAllocateMem) {
+      /**
+       * When lazy-allocation is enabled, framework takes care of auto
+       * allocating memory on need basis. Desirable block size is set to 256MB
+       */
+      MIN_BLOCK_SIZE = 256 << 20; //256 MB
+    } else {
+      int minBlockSize = conf.getInt(TezRuntimeConfiguration
+              .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB,
+          TezRuntimeConfiguration
+              .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB_DEFAULT);
+      Preconditions.checkArgument(
+          (minBlockSize > 0 && minBlockSize < 2047),
+          TezRuntimeConfiguration
+              .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB
+              + "=" + minBlockSize + " should be a positive value between 0 and 2047");
+      MIN_BLOCK_SIZE = minBlockSize << 20;
+    }
+
     StringBuilder initialSetupLogLine = new StringBuilder("Setting up PipelinedSorter for ")
         .append(outputContext.getDestinationVertexName()).append(": ");
     partitionBits = bitcount(partitions)+1;
@@ -135,23 +153,7 @@ public class PipelinedSorter extends ExternalSorter {
     final long sortmb = this.availableMemoryMb;
 
     // buffers and accounting
-    long maxMemUsage = sortmb << 20;
-
-    this.blockSize = computeBlockSize(blkSize, maxMemUsage);
-
-    long usage = sortmb << 20;
-    //Divide total memory into different blocks.
-    int numberOfBlocks = Math.max(1, (int) Math.ceil(1.0 * usage / blockSize));
-    initialSetupLogLine.append("#blocks=").append(numberOfBlocks);
-    initialSetupLogLine.append(", maxMemUsage=").append(maxMemUsage);
-    initialSetupLogLine.append(", BLOCK_SIZE=").append(blockSize);
-    initialSetupLogLine.append(", finalMergeEnabled=").append(isFinalMergeEnabled());
-    initialSetupLogLine.append(", pipelinedShuffle=").append(pipelinedShuffle);
-    initialSetupLogLine.append(", sendEmptyPartitions=").append(sendEmptyPartitionDetails);
-    initialSetupLogLine.append(", ").append(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB).append(
-        "=").append(
-        sortmb);
-
+    long maxMemLimit = sortmb << 20;
 
     initialSetupLogLine.append(", UsingHashComparator=");
     // k/v serialization
@@ -166,20 +168,43 @@ public class PipelinedSorter extends ExternalSorter {
     LOG.info(initialSetupLogLine.toString());
 
     long totalCapacityWithoutMeta = 0;
-    for (int i = 0; i < numberOfBlocks; i++) {
-      Preconditions.checkArgument(usage > 0, "usage can't be less than zero " + usage);
-      long size = Math.min(usage, blockSize);
+    long availableMem = maxMemLimit;
+    int numBlocks = 0;
+    while(availableMem > 0) {
+      long size = Math.min(availableMem, computeBlockSize(availableMem, maxMemLimit));
       int sizeWithoutMeta = (int) ((size) - (size % METASIZE));
-      bufferList.add(ByteBuffer.allocate(sizeWithoutMeta));
       totalCapacityWithoutMeta += sizeWithoutMeta;
-      usage -= size;
+      availableMem -= size;
+      numBlocks++;
     }
+    currentAllocatableMemory = maxMemLimit;
+    maxNumberOfBlocks = numBlocks;
     capacity = totalCapacityWithoutMeta;
-    listIterator = bufferList.listIterator();
 
+    buffers = Lists.newArrayListWithCapacity(maxNumberOfBlocks);
+    allocateSpace(); //Allocate the first block
+    if (!lazyAllocateMem) {
+      LOG.info("Pre allocating rest of memory buffers upfront");
+      while(allocateSpace() != null);
+    }
 
-    Preconditions.checkArgument(listIterator.hasNext(), "Buffer list seems to be empty " + bufferList.size());
-    span = new SortSpan(listIterator.next(), 1024*1024, 16, this.comparator);
+    initialSetupLogLine.append("#blocks=").append(maxNumberOfBlocks);
+    initialSetupLogLine.append(", maxMemUsage=").append(maxMemLimit);
+    initialSetupLogLine.append(", lazyAllocateMem=").append(
+        lazyAllocateMem);
+    initialSetupLogLine.append(", minBlockSize=").append(MIN_BLOCK_SIZE);
+    initialSetupLogLine.append(", initial BLOCK_SIZE=").append(buffers.get(0).capacity());
+    initialSetupLogLine.append(", finalMergeEnabled=").append(isFinalMergeEnabled());
+    initialSetupLogLine.append(", pipelinedShuffle=").append(pipelinedShuffle);
+    initialSetupLogLine.append(", sendEmptyPartitions=").append(sendEmptyPartitionDetails);
+    initialSetupLogLine.append(", ").append(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB).append(
+        "=").append(
+        sortmb);
+
+    Preconditions.checkState(buffers.size() > 0, "Atleast one buffer needs to be present");
+    LOG.info(initialSetupLogLine.toString());
+
+    span = new SortSpan(buffers.get(bufferIndex), 1024 * 1024, 16, this.comparator);
     merger = new SpanMerger(); // SpanIterators are comparable
     final int sortThreads = 
             this.conf.getInt(
@@ -197,18 +222,67 @@ public class PipelinedSorter extends ExternalSorter {
     minSpillsForCombine = this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINE_MIN_SPILLS, 3);
   }
 
+  ByteBuffer allocateSpace() {
+    if (currentAllocatableMemory <= 0) {
+      //No space available.
+      return null;
+    }
+
+    int size = computeBlockSize(currentAllocatableMemory, availableMemoryMb << 20);
+    currentAllocatableMemory -= size;
+    int sizeWithoutMeta = (size) - (size % METASIZE);
+    ByteBuffer space = ByteBuffer.allocate(sizeWithoutMeta);
+
+    buffers.add(space);
+    bufferIndex++;
+
+    Preconditions.checkState(buffers.size() <= maxNumberOfBlocks,
+        "Number of blocks " + buffers.size()
+            + " is exceeding  " + maxNumberOfBlocks);
+
+    LOG.info("Newly allocated block size=" + size
+        + ", index=" + bufferIndex
+        + ", Number of buffers=" + buffers.size()
+        + ", currentAllocatableMemory=" + currentAllocatableMemory
+        + ", currentBufferSize=" + space.capacity()
+        + ", total=" + (availableMemoryMb << 20));
+    return space;
+  }
+
+
   @VisibleForTesting
-  static int computeBlockSize(int blkSize, long maxMemUsage) {
-    if (blkSize == 0) {
-      return (int) Math.min(maxMemUsage, Integer.MAX_VALUE);
-    } else {
-      Preconditions.checkArgument(blkSize > 0, "blkSize should be between 1 and Integer.MAX_VALUE");
-      if (blkSize >= maxMemUsage) {
-        return (maxMemUsage > Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) maxMemUsage;
-      } else {
-        return blkSize;
+  int computeBlockSize(long availableMem, long maxAllocatedMemory) {
+    int maxBlockSize = 0;
+    /**
+     * When lazy-allocation is enabled, framework takes care of auto allocating
+     * memory on need basis. In such cases, first buffer starts with 32 MB.
+     */
+    if (lazyAllocateMem) {
+      if (buffers == null || buffers.isEmpty()) {
+        return 32 << 20; //32 MB
+      }
+    }
+
+    //Honor MIN_BLOCK_SIZE
+    maxBlockSize = Math.max(MIN_BLOCK_SIZE, maxBlockSize);
+
+    if (availableMem < maxBlockSize) {
+      maxBlockSize = (int) availableMem;
+    }
+
+    int maxMem = (maxAllocatedMemory > Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) maxAllocatedMemory;
+    if (maxBlockSize > maxMem) {
+      maxBlockSize = maxMem;
+    }
+
+    availableMem -= maxBlockSize;
+    if (availableMem < MIN_BLOCK_SIZE) {
+      if ((maxBlockSize + availableMem) < Integer.MAX_VALUE) {
+        //Merge remaining with last block
+        maxBlockSize += availableMem;
       }
     }
+    return maxBlockSize;
   }
 
   private int bitcount(int n) {
@@ -237,8 +311,8 @@ public class PipelinedSorter extends ExternalSorter {
       if (pipelinedShuffle && ret) {
         sendPipelinedShuffleEvents();
       }
-      //safe to reset the iterator
-      listIterator = bufferList.listIterator();
+      //safe to reset bufferIndex to 0;
+      bufferIndex = 0;
       int items = 1024*1024;
       int perItem = 16;
       if(span.length() != 0) {
@@ -250,9 +324,9 @@ public class PipelinedSorter extends ExternalSorter {
             items = 1024*1024;
         }
       }
-      Preconditions.checkArgument(listIterator.hasNext(), "block iterator should not be empty");
+      Preconditions.checkArgument(buffers.get(bufferIndex) != null, "block should not be empty");
       //TODO: fix per item being passed.
-      span = new SortSpan((ByteBuffer)listIterator.next().clear(), (1024*1024),
+      span = new SortSpan((ByteBuffer)buffers.get(bufferIndex).clear(), (1024*1024),
           perItem, ConfigUtils.getIntermediateOutputKeyComparator(this.conf));
     } else {
       // queue up the sort
@@ -325,7 +399,7 @@ public class PipelinedSorter extends ExternalSorter {
       // restore limit
       span.kvbuffer.position(keystart);
       this.sort();
-      if (span.length() == 0 || bufferOverflowRecursion > bufferList.size()) {
+      if (span.length() == 0 || bufferOverflowRecursion > buffers.size()) {
         // spill the current key value pair
         spillSingleRecord(key, value, partition);
         bufferOverflowRecursion = 0;
@@ -562,7 +636,7 @@ public class PipelinedSorter extends ExternalSorter {
       sortmaster.shutdown();
 
       //safe to clean up
-      bufferList.clear();
+      buffers.clear();
 
 
       if(indexCacheList.isEmpty()) {
@@ -911,11 +985,12 @@ public class PipelinedSorter extends ExternalSorter {
       LOG.info(outputContext.getDestinationVertexName() + ": " + String.format("Span%d.length = %d, perItem = %d", index, length(), perItem));
       if(remaining.remaining() < METASIZE+perItem) {
         //Check if we can get the next Buffer from the main buffer list
-        if (listIterator.hasNext()) {
+        ByteBuffer space = allocateSpace();
+        if (space != null) {
           LOG.info(outputContext.getDestinationVertexName() + ": " + "Getting memory from next block in the list, recordsWritten=" +
               mapOutputRecordCounter.getValue());
           reinit = true;
-          return listIterator.next();
+          return space;
         }
         return null;
       }

http://git-wip-us.apache.org/repos/asf/tez/blob/149a04c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
index 45b6713..c0b0760 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
@@ -225,6 +225,8 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput {
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINE_MIN_SPILLS);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_INTERNAL_SORTER_CLASS);

http://git-wip-us.apache.org/repos/asf/tez/blob/149a04c2/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
index 92163c4..2cebea4 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
@@ -64,7 +64,6 @@ import static org.mockito.internal.verification.VerificationModeFactory.times;
  * limitations under the License.
  */
 public class TestPipelinedSorter {
-  private static Configuration conf = new Configuration();
   private static FileSystem localFs = null;
   private static Path workDir = null;
   private OutputContext outputContext;
@@ -76,7 +75,7 @@ public class TestPipelinedSorter {
   private static TreeMap<String, String> sortedDataMap = Maps.newTreeMap();
 
   static {
-    conf.set("fs.defaultFS", "file:///");
+    Configuration conf = getConf();
     try {
       localFs = FileSystem.getLocal(conf);
       workDir = new Path(
@@ -99,7 +98,11 @@ public class TestPipelinedSorter {
     TezCounters counters = new TezCounters();
     String uniqueId = UUID.randomUUID().toString();
     this.outputContext = createMockOutputContext(counters, appId, uniqueId);
+  }
 
+  public static Configuration getConf() {
+    Configuration conf = new Configuration();
+    conf.set("fs.defaultFS", "file:///");
     //To enable PipelinedSorter
     conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_SORTER_CLASS, SorterImpl.PIPELINED.name());
 
@@ -110,15 +113,17 @@ public class TestPipelinedSorter {
     conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true);
 
     //Setup localdirs
-    String localDirs = workDir.toString();
-    conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, localDirs);
+    if (workDir != null) {
+      String localDirs = workDir.toString();
+      conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, localDirs);
+    }
+    return conf;
   }
 
   @After
   public void reset() throws IOException {
     cleanup();
     localFs.mkdirs(workDir);
-    conf = new Configuration();
   }
 
   @Test
@@ -126,6 +131,7 @@ public class TestPipelinedSorter {
     //TODO: need to support multiple partition testing later
 
     //# partition, # of keys, size per key, InitialMem, blockSize
+    Configuration conf = getConf();
     conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 5);
     basicTest(1, 100000, 100, (10 * 1024l * 1024l), 3 << 20);
 
@@ -133,6 +139,7 @@ public class TestPipelinedSorter {
 
   @Test
   public void testWithoutPartitionStats() throws IOException {
+    Configuration conf = getConf();
     conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS, false);
     //# partition, # of keys, size per key, InitialMem, blockSize
     basicTest(1, 0, 0, (10 * 1024l * 1024l), 3 << 20);
@@ -141,6 +148,7 @@ public class TestPipelinedSorter {
 
   @Test
   public void testWithEmptyData() throws IOException {
+    Configuration conf = getConf();
     conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 5);
     //# partition, # of keys, size per key, InitialMem, blockSize
     basicTest(1, 0, 0, (10 * 1024l * 1024l), 3 << 20);
@@ -150,9 +158,12 @@ public class TestPipelinedSorter {
   public void testEmptyDataWithPipelinedShuffle() throws IOException {
     this.numOutputs = 1;
     this.initialAvailableMem = 1 *1024 * 1024;
+    Configuration conf = getConf();
     conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false);
+    conf.setInt(TezRuntimeConfiguration
+        .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 1);
     PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs,
-        initialAvailableMem, 1 << 20);
+        initialAvailableMem);
 
     writeData(sorter, 0, 1<<20);
 
@@ -186,16 +197,19 @@ public class TestPipelinedSorter {
   @Test
   public void testKVExceedsBuffer2() throws IOException {
     // a list of 4 blocks each 256kb, 2KV pair, key 1mb, value 1mb
-    basicTest(1, 2, (1 << 20), (1 * 1024l * 1024l), 256<<10);
+    basicTest(1, 2, (1 << 20), (1 * 1024l * 1024l), 256<<20);
   }
 
   @Test
   public void testExceedsKVWithMultiplePartitions() throws IOException {
+    Configuration conf = getConf();
     conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true);
     this.numOutputs = 5;
     this.initialAvailableMem = 1 * 1024 * 1024;
+    conf.setInt(TezRuntimeConfiguration
+        .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 1);
     PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs,
-        initialAvailableMem, 0);
+        initialAvailableMem);
 
     writeData(sorter, 100, 1<<20);
     verifyCounters(sorter, outputContext);
@@ -205,9 +219,12 @@ public class TestPipelinedSorter {
   public void testExceedsKVWithPipelinedShuffle() throws IOException {
     this.numOutputs = 1;
     this.initialAvailableMem = 1 *1024 * 1024;
+    Configuration conf = getConf();
     conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false);
+    conf.setInt(TezRuntimeConfiguration
+        .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 1);
     PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs,
-        initialAvailableMem, 1 << 20);
+        initialAvailableMem);
 
     writeData(sorter, 5, 1<<20);
 
@@ -222,9 +239,12 @@ public class TestPipelinedSorter {
   public void test_TEZ_2602_50mb() throws IOException {
     this.numOutputs = 1;
     this.initialAvailableMem = 1 *1024 * 1024;
+    Configuration conf = getConf();
     conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true);
+    conf.setInt(TezRuntimeConfiguration
+        .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 1);
     PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs,
-        initialAvailableMem, 1 << 20);
+        initialAvailableMem);
 
     Text value = new Text("1");
     long size = 50 * 1024 * 1024;
@@ -238,13 +258,14 @@ public class TestPipelinedSorter {
     sorter.close();
   }
 
-  @Test
+  //@Test
   public void testLargeDataWithMixedKV() throws IOException {
     this.numOutputs = 1;
     this.initialAvailableMem = 48 *1024 * 1024;
+    Configuration conf = getConf();
     conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true);
     PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs,
-        initialAvailableMem, 0);
+        initialAvailableMem);
 
     //write 10 MB KV
     Text key = new Text(RandomStringUtils.randomAlphanumeric(10 << 20));
@@ -293,13 +314,15 @@ public class TestPipelinedSorter {
     // 20 KVpairs of 2X10kb, 10 KV of 2X200kb, 20KV of 2X10kb
     int numkeys[] = {20, 10, 20};
     int keylens[] = {10<<10, 200<<10, 10<<10};
-    basicTest2(1, numkeys, keylens, (1 * 1024l * 1024l), 1 << 18);
+    basicTest2(1, numkeys, keylens, (10 * 1024l * 1024l), 2);
   }
 
   @Test
   public void testWithCustomComparator() throws IOException {
     //Test with custom comparator
-    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS, CustomComparator.class.getName());
+    Configuration conf = getConf();
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS,
+        CustomComparator.class.getName());
     basicTest(1, 100000, 100, (10 * 1024l * 1024l), 3 << 20);
   }
 
@@ -307,10 +330,13 @@ public class TestPipelinedSorter {
   public void testWithPipelinedShuffle() throws IOException {
     this.numOutputs = 1;
     this.initialAvailableMem = 5 *1024 * 1024;
+    Configuration conf = getConf();
     conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 5);
     conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false);
+    conf.setInt(TezRuntimeConfiguration
+        .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 1);
     PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs,
-        initialAvailableMem, 1 << 20);
+        initialAvailableMem);
 
     //Write 100 keys each of size 10
     writeData(sorter, 10000, 100);
@@ -323,11 +349,14 @@ public class TestPipelinedSorter {
 
   @Test
   public void testCountersWithMultiplePartitions() throws IOException {
+    Configuration conf = getConf();
     conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true);
     this.numOutputs = 5;
     this.initialAvailableMem = 5 * 1024 * 1024;
+    conf.setInt(TezRuntimeConfiguration
+        .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 1);
     PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs,
-        initialAvailableMem, 0);
+        initialAvailableMem);
 
     writeData(sorter, 10000, 100);
     verifyCounters(sorter, outputContext);
@@ -336,8 +365,11 @@ public class TestPipelinedSorter {
   public void basicTest2(int partitions, int[] numkeys, int[] keysize,
       long initialAvailableMem, int  blockSize) throws IOException {
     this.numOutputs = partitions; // single output
+    Configuration conf = getConf();
+    conf.setInt(TezRuntimeConfiguration
+        .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 100);
     PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs,
-        initialAvailableMem, blockSize);
+        initialAvailableMem);
     writeData2(sorter, numkeys, keysize);
     verifyCounters(sorter, outputContext);
   }
@@ -360,10 +392,13 @@ public class TestPipelinedSorter {
   }
 
   public void basicTest(int partitions, int numKeys, int keySize,
-      long initialAvailableMem, int blockSize) throws IOException {
+      long initialAvailableMem, int minBlockSize) throws IOException {
     this.numOutputs = partitions; // single output
+    Configuration conf = getConf();
+    conf.setInt(TezRuntimeConfiguration
+        .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, minBlockSize >> 20);
     PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs,
-        initialAvailableMem, blockSize);
+        initialAvailableMem);
 
     writeData(sorter, numKeys, keySize);
 
@@ -420,50 +455,250 @@ public class TestPipelinedSorter {
 
 
   @Test
+  //Intentionally not having timeout
   //Its not possible to allocate > 2 GB in test environment.  Carry out basic checks here.
   public void memTest() throws IOException {
     //Verify if > 2 GB can be set via config
+    Configuration conf = getConf();
     conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 3076);
     long size = ExternalSorter.getInitialMemoryRequirement(conf, 4096 * 1024 * 1024l);
     Assert.assertTrue(size == (3076l << 20));
 
     //Verify number of block buffers allocated
     this.initialAvailableMem = 10 * 1024 * 1024;
+    conf.setInt(TezRuntimeConfiguration
+        .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 1);
     PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs,
-        initialAvailableMem, 1 << 20);
-    Assert.assertTrue(sorter.bufferList.size() == 10);
+        initialAvailableMem);
+    Assert.assertTrue(sorter.maxNumberOfBlocks == 10);
 
+    //10 MB available, request for 3 MB chunk. Last 1 MB gets added to previous chunk.
+    conf.setInt(TezRuntimeConfiguration
+        .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 3);
     sorter = new PipelinedSorter(this.outputContext, conf, numOutputs,
-        initialAvailableMem, 3 << 20);
-    Assert.assertTrue(sorter.bufferList.size() == 4);
+        initialAvailableMem);
+    Assert.assertTrue(sorter.maxNumberOfBlocks == 3);
 
+    //10 MB available, request for 10 MB min chunk.  Would get 1 block.
+    conf.setInt(TezRuntimeConfiguration
+        .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 10);
     sorter = new PipelinedSorter(this.outputContext, conf, numOutputs,
-        initialAvailableMem, 10 << 20);
-    Assert.assertTrue(sorter.bufferList.size() == 1);
+        initialAvailableMem);
+    Assert.assertTrue(sorter.maxNumberOfBlocks == 1);
+
+    //Verify block sizes (10 MB min chunk size), but available mem is zero.
+    conf.setInt(TezRuntimeConfiguration
+        .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 10);
+    sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, initialAvailableMem);
+    Assert.assertTrue(sorter.maxNumberOfBlocks == 1);
+    int blockSize = sorter.computeBlockSize(0, (10 << 20));
+    //available is zero. Can't allocate any more buffer.
+    Assert.assertTrue(blockSize == 0);
+
+    //300 MB available. Request for 200 MB min block size. It would allocate a block with 200 MB,
+    // but last 100 would get clubbed. Hence, it would return 300 MB block.
+    conf.setInt(TezRuntimeConfiguration
+        .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 200);
+    sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, (300 << 20));
+    Assert.assertTrue(sorter.maxNumberOfBlocks == 1);
+    blockSize = sorter.computeBlockSize((300 << 20), (300 << 20));
+    Assert.assertTrue(blockSize == (300 << 20));
+
+    //300 MB available. Request for 3500 MB min block size. throw exception
+    conf.setInt(TezRuntimeConfiguration
+        .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 3500);
+    try {
+      sorter = new PipelinedSorter(this.outputContext, conf, numOutputs,
+          (300 << 20));
+    } catch(IllegalArgumentException iae ) {
+      assertTrue(iae.getMessage().contains("positive value between 0 and 2047"));
+    }
 
-    //Verify block sizes
-    int blockSize = PipelinedSorter.computeBlockSize(0, (10 * 1024 * 1024));
-    //initialAvailableMem is < 2 GB. So consider it as the blockSize
-    Assert.assertTrue(blockSize == (10 * 1024 * 1024));
+    //64 MB available. Request for 32 MB min block size.
+    conf.setBoolean(TezRuntimeConfiguration
+        .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, false);
+    conf.setInt(TezRuntimeConfiguration
+        .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 32);
+    sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, (64 << 20));
+    Assert.assertTrue(sorter.maxNumberOfBlocks == 2);
+    blockSize = sorter.computeBlockSize((64 << 20), (64 << 20));
+    Assert.assertTrue(blockSize == (32 << 20));
+
+    blockSize = sorter.computeBlockSize((32 << 20), (64 << 20));
+    Assert.assertTrue(blockSize == (32 << 20));
+
+    blockSize = sorter.computeBlockSize((48 << 20), (64 << 20));
+    Assert.assertTrue(blockSize == (48 << 20));
+
+    //64 MB available. Request for 8 MB min block size.
+    conf.setInt(TezRuntimeConfiguration
+        .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 8);
+    sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, (64 << 20));
+    Assert.assertTrue(sorter.maxNumberOfBlocks == 8);
+    blockSize = sorter.computeBlockSize((64 << 20), (64 << 20));
+    //Should return 16 instead of 8 which is min block size.
+    Assert.assertTrue(blockSize == (8 << 20));
+  }
 
-    blockSize = PipelinedSorter.computeBlockSize(0, (10 * 1024 * 1024 * 1024l));
-    //initialAvailableMem is > 2 GB. Restrict block size to Integer.MAX_VALUE;
-    Assert.assertTrue(blockSize == Integer.MAX_VALUE);
+  @Test
+  //Intentionally not having timeout
+  public void test_without_lazyMemAllocation() throws IOException {
+    this.numOutputs = 10;
+    Configuration conf = getConf();
+
+    //128 MB. Pre-allocate. Request for default block size. Get 1 buffer
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 128);
+    conf.setInt(TezRuntimeConfiguration
+        .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB,
+        TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB_DEFAULT);
+    conf.setBoolean(TezRuntimeConfiguration
+        .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, false);
+    PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf,
+        numOutputs, (128l << 20));
+    assertTrue("Expected 1 sort buffers. current len=" + sorter.buffers.size(),
+        sorter.buffers.size() == 1);
+
+    //128 MB. Pre-allocate. Get 2 buffer
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 128);
+    conf.setInt(TezRuntimeConfiguration
+        .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 62);
+    conf.setBoolean(TezRuntimeConfiguration
+        .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, false);
+    sorter = new PipelinedSorter(this.outputContext, conf,
+        numOutputs, (128l << 20));
+    assertTrue("Expected 2 sort buffers. current len=" + sorter.buffers.size(),
+        sorter.buffers.size() == 2);
+
+    //48 MB. Pre-allocate. But request for lesser block size (62). Get 2 buffer
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 48);
+    conf.setInt(TezRuntimeConfiguration
+        .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 62);
+    conf.setBoolean(TezRuntimeConfiguration
+        .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, false);
+    sorter = new PipelinedSorter(this.outputContext, conf,
+        numOutputs, (48l << 20));
+    assertTrue("Expected 1 sort buffers. current len=" + sorter.buffers.size(),
+        sorter.buffers.size() == 1);
+  }
+
+  @Test
+  //Intentionally not having timeout
+  public void test_with_lazyMemAllocation() throws IOException {
+    this.numOutputs = 10;
+    Configuration conf = getConf();
+
+    //128 MB. Do not pre-allocate.
+    // Get 32 MB buffer first and the another buffer with 96 on filling up
+    // the 32 MB buffer.
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 128);
+    conf.setBoolean(TezRuntimeConfiguration
+        .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, true);
+    PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf,
+        numOutputs, (128l << 20));
+    assertTrue("Expected 1 sort buffers. current len=" + sorter.buffers.size(),
+        sorter.buffers.size() == 1);
+    assertTrue(sorter.buffers.get(0).capacity() == 32 * 1024 * 1024);
+    writeData(sorter, 100, 1024*1024, false); //100 1 MB KV. Will spill
+
+    //Now it should have created 2 buffers, 32 & 96 MB buffers.
+    assertTrue(sorter.buffers.size() == 2);
+    assertTrue(sorter.buffers.get(0).capacity() == 32 * 1024 * 1024);
+    assertTrue(sorter.buffers.get(1).capacity() == 96 * 1024 * 1024);
+    closeSorter(sorter);
+    verifyCounters(sorter, outputContext);
 
-    blockSize = PipelinedSorter.computeBlockSize((1*1024*1024*1024), (10 * 1024 * 1024));
-    //sort buffer is 10 MB. But block size requested is 1 GB. Restrict block size to 10 MB.
-    Assert.assertTrue(blockSize == (10 * 1024 * 1024));
+    //TODO: Not sure if this would fail in build machines due to mem
+    //300 MB. Do not pre-allocate.
+    // Get 1 buffer with 62 MB. But grow to 2 buffers as data is written
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 300);
+    conf.setBoolean(TezRuntimeConfiguration
+        .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, true);
+    sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, (300l << 20));
+    assertTrue(sorter.buffers.size() == 1);
+    assertTrue(sorter.buffers.get(0).capacity() == 32 * 1024 * 1024);
+
+    writeData(sorter, 50, 1024*1024, false); //50 1 MB KV to allocate 2nd buf
+    assertTrue(sorter.buffers.size() == 2);
+    assertTrue(sorter.buffers.get(0).capacity() == 32 * 1024 * 1024);
+    assertTrue(sorter.buffers.get(1).capacity() == 268 * 1024 * 1024);
+
+    //48 MB. Do not pre-allocate.
+    // Get 32 MB buffer first invariably and proceed with the rest.
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 48);
+    conf.setBoolean(TezRuntimeConfiguration
+        .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, true);
+    sorter = new PipelinedSorter(this.outputContext, conf,
+        numOutputs, (48l << 20));
+    assertTrue("Expected 1 sort buffers. current len=" + sorter.buffers.size(),
+        sorter.buffers.size() == 1);
+    assertTrue(sorter.buffers.get(0).capacity() == 32 * 1024 * 1024);
+    writeData(sorter, 20, 1024*1024, false); //100 1 MB KV. Will spill
+
+    //Now it should have created 2 buffers, 32 & 96 MB buffers.
+    assertTrue(sorter.buffers.size() == 2);
+    assertTrue(sorter.buffers.get(0).capacity() == 32 * 1024 * 1024);
+    assertTrue(sorter.buffers.get(1).capacity() == 16 * 1024 * 1024);
+    closeSorter(sorter);
+  }
 
+  @Test
+  //Intentionally not having timeout
+  public void testLazyAllocateMem() throws IOException {
+    this.numOutputs = 10;
+    Configuration conf = getConf();
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 128);
+    conf.setBoolean(TezRuntimeConfiguration
+        .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, false);
+    conf.setInt(TezRuntimeConfiguration
+        .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 4500);
     try {
-      blockSize = PipelinedSorter.computeBlockSize(-1, (10 * 1024 * 1024 * 1024l));
-      //block size can't set to -1
-      fail();
-    } catch(IllegalArgumentException e) {
-      Assert.assertTrue(e.getMessage().contains("should be between 1 and Integer.MAX_VALUE"));
+      PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf,
+          numOutputs, (4500l << 20));
+    } catch (IllegalArgumentException iae) {
+      assertTrue(iae.getMessage().contains(TezRuntimeConfiguration
+          .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB));
+      assertTrue(iae.getMessage().contains("value between 0 and 2047"));
     }
+
+    conf.setInt(TezRuntimeConfiguration
+        .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, -1);
+    try {
+      PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf,
+          numOutputs, (4500l << 20));
+    } catch (IllegalArgumentException iae) {
+      assertTrue(iae.getMessage().contains(TezRuntimeConfiguration
+          .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB));
+      assertTrue(iae.getMessage().contains("value between 0 and 2047"));
+    }
+
+    conf.setBoolean(TezRuntimeConfiguration
+        .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, true);
+    conf.setInt(TezRuntimeConfiguration
+        .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, -1);
+    try {
+      PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf,
+          numOutputs, (4500l << 20));
+    } catch (IllegalArgumentException iae) {
+      assertTrue(iae.getMessage().contains(TezRuntimeConfiguration
+          .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB));
+      assertTrue(iae.getMessage().contains("value between 0 and 2047"));
+    }
+
+  }
+
+  @Test
+  //Intentionally not having timeout
+  public void testWithLargeKeyValueWithMinBlockSize() throws IOException {
+    //2 MB key & 2 MB value, 48 MB sort buffer.  block size is 16MB
+    basicTest(1, 5, (2 << 20), (48 * 1024l * 1024l), 16 << 20);
   }
 
   private void writeData(ExternalSorter sorter, int numKeys, int keyLen) throws IOException {
+    writeData(sorter, numKeys, keyLen, true);
+  }
+
+  private void writeData(ExternalSorter sorter, int numKeys, int keyLen,
+      boolean autoClose) throws IOException {
     sortedDataMap.clear();
     for (int i = 0; i < numKeys; i++) {
       Text key = new Text(RandomStringUtils.randomAlphanumeric(keyLen));
@@ -471,8 +706,16 @@ public class TestPipelinedSorter {
       sorter.write(key, value);
       sortedDataMap.put(key.toString(), value.toString()); //for verifying data later
     }
-    sorter.flush();
-    sorter.close();
+    if (autoClose) {
+      closeSorter(sorter);
+    }
+  }
+
+  private void closeSorter(ExternalSorter sorter) throws IOException {
+    if (sorter != null) {
+      sorter.flush();
+      sorter.close();
+    }
   }
 
   private void verifyData(IFile.Reader reader)
@@ -481,6 +724,7 @@ public class TestPipelinedSorter {
     Text readValue = new Text();
     DataInputBuffer keyIn = new DataInputBuffer();
     DataInputBuffer valIn = new DataInputBuffer();
+    Configuration conf = getConf();
     SerializationFactory serializationFactory = new SerializationFactory(conf);
     Deserializer<Text> keyDeserializer = serializationFactory.getDeserializer(Text.class);
     Deserializer<Text> valDeserializer = serializationFactory.getDeserializer(Text.class);

http://git-wip-us.apache.org/repos/asf/tez/blob/149a04c2/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java
index f57731c..fabf52d 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java
@@ -21,6 +21,7 @@
 package org.apache.tez.runtime.library.conf;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -122,6 +123,10 @@ public class TestOrderedPartitionedKVEdgeConfig {
     fromConf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT, 0.11f);
     fromConf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 123);
     fromConf.set("io.shouldExist", "io");
+    fromConf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY,
+        true);
+    fromConf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB,
+        1000);
     Map<String, String> additionalConfs = new HashMap<String, String>();
     additionalConfs.put("test.key.2", "key2");
     additionalConfs.put(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, "1111");
@@ -151,6 +156,10 @@ public class TestOrderedPartitionedKVEdgeConfig {
     Configuration outputConf = rebuiltOutput.conf;
     Configuration inputConf = rebuiltInput.conf;
 
+    assertTrue(outputConf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY,
+        TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY_DEFAULT));
+    assertEquals(1000, outputConf.getInt(TezRuntimeConfiguration
+        .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 2000));
     assertEquals(3, outputConf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR, 0));
     assertEquals(1111, outputConf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, 0));
     assertEquals(2222, outputConf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE, 0));