You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/02/21 02:21:09 UTC

[02/16] tez git commit: TEZ-2085. PipelinedSorter should bail out (on BufferOverflowException) instead of retrying continuously (rbalamohan)

TEZ-2085. PipelinedSorter should bail out (on BufferOverflowException) instead of retrying continuously (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/94b39568
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/94b39568
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/94b39568

Branch: refs/heads/TEZ-2003
Commit: 94b395685e8685dc077baed64f3b9109e6c2efff
Parents: d788469
Author: Rajesh Balamohan <rb...@hortonworks.com>
Authored: Thu Feb 19 04:27:55 2015 +0530
Committer: Rajesh Balamohan <rb...@hortonworks.com>
Committed: Thu Feb 19 04:27:55 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../common/sort/impl/PipelinedSorter.java       | 48 ++++++++++++++++----
 .../common/sort/impl/TestPipelinedSorter.java   | 40 +++++++++++++++-
 3 files changed, 80 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/94b39568/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b1c8c99..351cef1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2085. PipelinedSorter should bail out (on BufferOverflowException) instead of retrying continuously.
   TEZ-167. Create tests for MR Combiner.
   TEZ-2080. Localclient should be using tezconf in init instead of yarnconf.
   TEZ-2072. Add missing Private annotation to createDAG in the DAG API class.

http://git-wip-us.apache.org/repos/asf/tez/blob/94b39568/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index c1a6637..6ad8ebf 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -93,9 +93,12 @@ public class PipelinedSorter extends ExternalSorter {
   private ListIterator<ByteBuffer> listIterator;
 
   //total memory capacity allocated to sorter
-  private long capacity;
+  private final long capacity;
 
-  private static final int BLOCK_SIZE = 1536 << 20;
+  //track buffer overflow recursively in all buffers
+  private int bufferOverflowRecursion;
+
+  private final int blockSize;
 
 
   // Merger
@@ -111,15 +114,15 @@ public class PipelinedSorter extends ExternalSorter {
 
   public PipelinedSorter(OutputContext outputContext, Configuration conf, int numOutputs,
       long initialMemoryAvailable) throws IOException {
-    this(outputContext,conf,numOutputs, initialMemoryAvailable, BLOCK_SIZE);
+    this(outputContext,conf,numOutputs, initialMemoryAvailable, 0);
   }
 
-  public PipelinedSorter(OutputContext outputContext, Configuration conf, int numOutputs,
-      long initialMemoryAvailable, int blockSize) throws IOException {
+  PipelinedSorter(OutputContext outputContext, Configuration conf, int numOutputs,
+      long initialMemoryAvailable, int blkSize) throws IOException {
     super(outputContext, conf, numOutputs, initialMemoryAvailable);
     
     partitionBits = bitcount(partitions)+1;
-   
+
     //sanity checks
     final long sortmb = this.availableMemoryMb;
     indexCacheMemoryLimit = this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES,
@@ -127,20 +130,24 @@ public class PipelinedSorter extends ExternalSorter {
 
     // buffers and accounting
     long maxMemUsage = sortmb << 20;
-    Preconditions.checkArgument(blockSize > 0 && blockSize < Integer.MAX_VALUE,"Block size should be" + " within 1 - Integer.MAX_VALUE" + blockSize);
+
+    this.blockSize = computeBlockSize(blkSize, maxMemUsage);
+
     long usage = sortmb << 20;
     //Divide total memory into different blocks.
     int numberOfBlocks = Math.max(1, (int) Math.ceil(1.0 * usage / blockSize));
     LOG.info("Number of Blocks : " + numberOfBlocks
         + ", maxMemUsage=" + maxMemUsage + ", BLOCK_SIZE=" + blockSize);
+    long totalCapacityWithoutMeta = 0;
     for (int i = 0; i < numberOfBlocks; i++) {
       Preconditions.checkArgument(usage > 0, "usage can't be less than zero " + usage);
       long size = Math.min(usage, blockSize);
       int sizeWithoutMeta = (int) ((size) - (size % METASIZE));
       bufferList.add(ByteBuffer.allocate(sizeWithoutMeta));
-      capacity += sizeWithoutMeta;
+      totalCapacityWithoutMeta += sizeWithoutMeta;
       usage -= size;
     }
+    capacity = totalCapacityWithoutMeta;
     listIterator = bufferList.listIterator();
 
 
@@ -170,6 +177,20 @@ public class PipelinedSorter extends ExternalSorter {
     minSpillsForCombine = this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINE_MIN_SPILLS, 3);
   }
 
+  @VisibleForTesting
+  static int computeBlockSize(int blkSize, long maxMemUsage) {
+    if (blkSize == 0) {
+      return (int) Math.min(maxMemUsage, Integer.MAX_VALUE);
+    } else {
+      Preconditions.checkArgument(blkSize > 0, "blkSize should be between 1 and Integer.MAX_VALUE");
+      if (blkSize >= maxMemUsage) {
+        return (maxMemUsage > Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) maxMemUsage;
+      } else {
+        return blkSize;
+      }
+    }
+  }
+
   private int bitcount(int n) {
     int bit = 0;
     while(n!=0) {
@@ -259,11 +280,22 @@ public class PipelinedSorter extends ExternalSorter {
       // restore limit
       span.kvbuffer.position(keystart);
       this.sort();
+
+      bufferOverflowRecursion++;
+      if (bufferOverflowRecursion > bufferList.size()) {
+        throw new MapBufferTooSmallException("Record too large for in-memory buffer. Exceeded "
+            + "buffer overflow limit, bufferOverflowRecursion=" + bufferOverflowRecursion + ", bufferList"
+            + ".size=" + bufferList.size() + ", blockSize=" + blockSize);
+      }
       // try again
       this.collect(key, value, partition);
       return;
     }
 
+    if (bufferOverflowRecursion > 0) {
+      bufferOverflowRecursion--;
+    }
+
     int prefix = 0;
 
     if(hasher != null) {

http://git-wip-us.apache.org/repos/asf/tez/blob/94b39568/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
index 0a070ff..f8331d6 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
@@ -14,6 +14,7 @@ import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.runtime.api.OutputContext;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter;
 import org.apache.tez.runtime.library.partitioner.HashPartitioner;
 import org.junit.After;
 import org.junit.Assert;
@@ -25,6 +26,7 @@ import java.util.Map;
 import java.util.TreeMap;
 import java.util.UUID;
 
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 
@@ -102,6 +104,21 @@ public class TestPipelinedSorter {
     //# partition, # of keys, size per key, InitialMem, blockSize
     conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 5);
     basicTest(1, 100000, 100, (10 * 1024l * 1024l), 3 << 20);
+
+    try {
+      //3 MB key & 3 MB value, whereas block size is just 3 MB
+      basicTest(1, 5, (3 << 20), (10 * 1024l * 1024l), 3 << 20);
+      fail();
+    } catch (IOException ioe) {
+      Assert.assertTrue(
+          ioe.getMessage().contains("Record too large for in-memory buffer."
+              + " Exceeded buffer overflow limit"));
+    }
+
+    //15 MB key & 15 MB value, 48 MB sort buffer.  block size is 48MB (or 1 block)
+    //meta would be 16 MB
+    basicTest(1, 5, (15 << 20), (48 * 1024l * 1024l), 48 << 20);
+
   }
 
   public void basicTest(int partitions, int numKeys, int keySize,
@@ -130,7 +147,7 @@ public class TestPipelinedSorter {
     long size = ExternalSorter.getInitialMemoryRequirement(conf, 4096 * 1024 * 1024l);
     Assert.assertTrue(size == (3076l << 20));
 
-    //Verify BLOCK_SIZEs
+    //Verify number of block buffers allocated
     this.initialAvailableMem = 10 * 1024 * 1024;
     PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs,
         initialAvailableMem, 1 << 20);
@@ -143,6 +160,27 @@ public class TestPipelinedSorter {
     sorter = new PipelinedSorter(this.outputContext, conf, numOutputs,
         initialAvailableMem, 10 << 20);
     Assert.assertTrue(sorter.bufferList.size() == 1);
+
+    //Verify block sizes
+    int blockSize = PipelinedSorter.computeBlockSize(0, (10 * 1024 * 1024));
+    //initialAvailableMem is < 2 GB. So consider it as the blockSize
+    Assert.assertTrue(blockSize == (10 * 1024 * 1024));
+
+    blockSize = PipelinedSorter.computeBlockSize(0, (10 * 1024 * 1024 * 1024l));
+    //initialAvailableMem is > 2 GB. Restrict block size to Integer.MAX_VALUE;
+    Assert.assertTrue(blockSize == Integer.MAX_VALUE);
+
+    blockSize = PipelinedSorter.computeBlockSize((1*1024*1024*1024), (10 * 1024 * 1024));
+    //sort buffer is 10 MB. But block size requested is 1 GB. Restrict block size to 10 MB.
+    Assert.assertTrue(blockSize == (10 * 1024 * 1024));
+
+    try {
+      blockSize = PipelinedSorter.computeBlockSize(-1, (10 * 1024 * 1024 * 1024l));
+      //block size can't set to -1
+      fail();
+    } catch(IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("should be between 1 and Integer.MAX_VALUE"));
+    }
   }
 
   private void writeData(ExternalSorter sorter, int numKeys, int keyLen) throws IOException {