You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/02/21 02:21:09 UTC
[02/16] tez git commit: TEZ-2085. PipelinedSorter should bail out (on
BufferOverflowException) instead of retrying continuously (rbalamohan)
TEZ-2085. PipelinedSorter should bail out (on BufferOverflowException) instead of retrying continuously (rbalamohan)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/94b39568
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/94b39568
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/94b39568
Branch: refs/heads/TEZ-2003
Commit: 94b395685e8685dc077baed64f3b9109e6c2efff
Parents: d788469
Author: Rajesh Balamohan <rb...@hortonworks.com>
Authored: Thu Feb 19 04:27:55 2015 +0530
Committer: Rajesh Balamohan <rb...@hortonworks.com>
Committed: Thu Feb 19 04:27:55 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../common/sort/impl/PipelinedSorter.java | 48 ++++++++++++++++----
.../common/sort/impl/TestPipelinedSorter.java | 40 +++++++++++++++-
3 files changed, 80 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/94b39568/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b1c8c99..351cef1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2085. PipelinedSorter should bail out (on BufferOverflowException) instead of retrying continuously.
TEZ-167. Create tests for MR Combiner.
TEZ-2080. Localclient should be using tezconf in init instead of yarnconf.
TEZ-2072. Add missing Private annotation to createDAG in the DAG API class.
http://git-wip-us.apache.org/repos/asf/tez/blob/94b39568/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index c1a6637..6ad8ebf 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -93,9 +93,12 @@ public class PipelinedSorter extends ExternalSorter {
private ListIterator<ByteBuffer> listIterator;
//total memory capacity allocated to sorter
- private long capacity;
+ private final long capacity;
- private static final int BLOCK_SIZE = 1536 << 20;
+ //track buffer overflow recursively in all buffers
+ private int bufferOverflowRecursion;
+
+ private final int blockSize;
// Merger
@@ -111,15 +114,15 @@ public class PipelinedSorter extends ExternalSorter {
public PipelinedSorter(OutputContext outputContext, Configuration conf, int numOutputs,
long initialMemoryAvailable) throws IOException {
- this(outputContext,conf,numOutputs, initialMemoryAvailable, BLOCK_SIZE);
+ this(outputContext,conf,numOutputs, initialMemoryAvailable, 0);
}
- public PipelinedSorter(OutputContext outputContext, Configuration conf, int numOutputs,
- long initialMemoryAvailable, int blockSize) throws IOException {
+ PipelinedSorter(OutputContext outputContext, Configuration conf, int numOutputs,
+ long initialMemoryAvailable, int blkSize) throws IOException {
super(outputContext, conf, numOutputs, initialMemoryAvailable);
partitionBits = bitcount(partitions)+1;
-
+
//sanity checks
final long sortmb = this.availableMemoryMb;
indexCacheMemoryLimit = this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES,
@@ -127,20 +130,24 @@ public class PipelinedSorter extends ExternalSorter {
// buffers and accounting
long maxMemUsage = sortmb << 20;
- Preconditions.checkArgument(blockSize > 0 && blockSize < Integer.MAX_VALUE,"Block size should be" + " within 1 - Integer.MAX_VALUE" + blockSize);
+
+ this.blockSize = computeBlockSize(blkSize, maxMemUsage);
+
long usage = sortmb << 20;
//Divide total memory into different blocks.
int numberOfBlocks = Math.max(1, (int) Math.ceil(1.0 * usage / blockSize));
LOG.info("Number of Blocks : " + numberOfBlocks
+ ", maxMemUsage=" + maxMemUsage + ", BLOCK_SIZE=" + blockSize);
+ long totalCapacityWithoutMeta = 0;
for (int i = 0; i < numberOfBlocks; i++) {
Preconditions.checkArgument(usage > 0, "usage can't be less than zero " + usage);
long size = Math.min(usage, blockSize);
int sizeWithoutMeta = (int) ((size) - (size % METASIZE));
bufferList.add(ByteBuffer.allocate(sizeWithoutMeta));
- capacity += sizeWithoutMeta;
+ totalCapacityWithoutMeta += sizeWithoutMeta;
usage -= size;
}
+ capacity = totalCapacityWithoutMeta;
listIterator = bufferList.listIterator();
@@ -170,6 +177,20 @@ public class PipelinedSorter extends ExternalSorter {
minSpillsForCombine = this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINE_MIN_SPILLS, 3);
}
+ @VisibleForTesting
+ static int computeBlockSize(int blkSize, long maxMemUsage) {
+ if (blkSize == 0) {
+ return (int) Math.min(maxMemUsage, Integer.MAX_VALUE);
+ } else {
+ Preconditions.checkArgument(blkSize > 0, "blkSize should be between 1 and Integer.MAX_VALUE");
+ if (blkSize >= maxMemUsage) {
+ return (maxMemUsage > Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) maxMemUsage;
+ } else {
+ return blkSize;
+ }
+ }
+ }
+
private int bitcount(int n) {
int bit = 0;
while(n!=0) {
@@ -259,11 +280,22 @@ public class PipelinedSorter extends ExternalSorter {
// restore limit
span.kvbuffer.position(keystart);
this.sort();
+
+ bufferOverflowRecursion++;
+ if (bufferOverflowRecursion > bufferList.size()) {
+ throw new MapBufferTooSmallException("Record too large for in-memory buffer. Exceeded "
+ + "buffer overflow limit, bufferOverflowRecursion=" + bufferOverflowRecursion + ", bufferList"
+ + ".size=" + bufferList.size() + ", blockSize=" + blockSize);
+ }
// try again
this.collect(key, value, partition);
return;
}
+ if (bufferOverflowRecursion > 0) {
+ bufferOverflowRecursion--;
+ }
+
int prefix = 0;
if(hasher != null) {
http://git-wip-us.apache.org/repos/asf/tez/blob/94b39568/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
index 0a070ff..f8331d6 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
@@ -14,6 +14,7 @@ import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter;
import org.apache.tez.runtime.library.partitioner.HashPartitioner;
import org.junit.After;
import org.junit.Assert;
@@ -25,6 +26,7 @@ import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
+import static org.junit.Assert.fail;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -102,6 +104,21 @@ public class TestPipelinedSorter {
//# partition, # of keys, size per key, InitialMem, blockSize
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 5);
basicTest(1, 100000, 100, (10 * 1024l * 1024l), 3 << 20);
+
+ try {
+ //3 MB key & 3 MB value, whereas block size is just 3 MB
+ basicTest(1, 5, (3 << 20), (10 * 1024l * 1024l), 3 << 20);
+ fail();
+ } catch (IOException ioe) {
+ Assert.assertTrue(
+ ioe.getMessage().contains("Record too large for in-memory buffer."
+ + " Exceeded buffer overflow limit"));
+ }
+
+ //15 MB key & 15 MB value, 48 MB sort buffer. block size is 48MB (or 1 block)
+ //meta would be 16 MB
+ basicTest(1, 5, (15 << 20), (48 * 1024l * 1024l), 48 << 20);
+
}
public void basicTest(int partitions, int numKeys, int keySize,
@@ -130,7 +147,7 @@ public class TestPipelinedSorter {
long size = ExternalSorter.getInitialMemoryRequirement(conf, 4096 * 1024 * 1024l);
Assert.assertTrue(size == (3076l << 20));
- //Verify BLOCK_SIZEs
+ //Verify number of block buffers allocated
this.initialAvailableMem = 10 * 1024 * 1024;
PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs,
initialAvailableMem, 1 << 20);
@@ -143,6 +160,27 @@ public class TestPipelinedSorter {
sorter = new PipelinedSorter(this.outputContext, conf, numOutputs,
initialAvailableMem, 10 << 20);
Assert.assertTrue(sorter.bufferList.size() == 1);
+
+ //Verify block sizes
+ int blockSize = PipelinedSorter.computeBlockSize(0, (10 * 1024 * 1024));
+ //initialAvailableMem is < 2 GB. So consider it as the blockSize
+ Assert.assertTrue(blockSize == (10 * 1024 * 1024));
+
+ blockSize = PipelinedSorter.computeBlockSize(0, (10 * 1024 * 1024 * 1024l));
+ //initialAvailableMem is > 2 GB. Restrict block size to Integer.MAX_VALUE;
+ Assert.assertTrue(blockSize == Integer.MAX_VALUE);
+
+ blockSize = PipelinedSorter.computeBlockSize((1*1024*1024*1024), (10 * 1024 * 1024));
+ //sort buffer is 10 MB. But block size requested is 1 GB. Restrict block size to 10 MB.
+ Assert.assertTrue(blockSize == (10 * 1024 * 1024));
+
+ try {
+ blockSize = PipelinedSorter.computeBlockSize(-1, (10 * 1024 * 1024 * 1024l));
+ //block size can't set to -1
+ fail();
+ } catch(IllegalArgumentException e) {
+ Assert.assertTrue(e.getMessage().contains("should be between 1 and Integer.MAX_VALUE"));
+ }
}
private void writeData(ExternalSorter sorter, int numKeys, int keyLen) throws IOException {