You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2015/10/28 21:35:08 UTC
tez git commit: TEZ-2244. PipelinedSorter: Progressive allocation for
sort-buffers (rbalamohan)
Repository: tez
Updated Branches:
refs/heads/master 70096c169 -> 149a04c28
TEZ-2244. PipelinedSorter: Progressive allocation for sort-buffers (rbalamohan)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/149a04c2
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/149a04c2
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/149a04c2
Branch: refs/heads/master
Commit: 149a04c28d53988fd49510c5453fc19ac0d1030e
Parents: 70096c1
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Wed Oct 28 13:35:36 2015 -0700
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Wed Oct 28 13:35:36 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../library/api/TezRuntimeConfiguration.java | 28 ++
.../common/sort/impl/PipelinedSorter.java | 187 +++++++----
.../output/OrderedPartitionedKVOutput.java | 2 +
.../common/sort/impl/TestPipelinedSorter.java | 328 ++++++++++++++++---
.../TestOrderedPartitionedKVEdgeConfig.java | 9 +
6 files changed, 457 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/149a04c2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4f58119..14e2bbe 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.8.2: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2244. PipelinedSorter: Progressive allocation for sort-buffers
TEZ-2904. Pig can't specify task specific command opts
TEZ-2888. Make critical path calculation resilient to AM crash
TEZ-2899. Tez UI: DAG getting created with huge horizontal gap in between vertices
http://git-wip-us.apache.org/repos/asf/tez/blob/149a04c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
index cf05546..caad6ef 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
@@ -128,6 +128,31 @@ public class TezRuntimeConfiguration {
"combine.min.spills";
public static final int TEZ_RUNTIME_COMBINE_MIN_SPILLS_DEFAULT = 3;
+ /**
+ * Tries to allocate @link{#TEZ_RUNTIME_IO_SORT_MB} in chunks specified in
+ * this parameter.
+ */
+ @ConfigurationProperty(type = "integer")
+ public static final String
+ TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB = TEZ_RUNTIME_PREFIX +
+ "pipelined.sorter.min-block.size.in.mb";
+ public static final int
+ TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB_DEFAULT = 2000;
+
+ /**
+ * Setting this to true would enable sorter
+ * to auto-allocate memory on need basis in progressive fashion.
+ *
+ * Setting to false would allocate all available memory during
+ * initialization of sorter. In such cases,@link{#TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB}
+ * would be honored and memory specified in @link{#TEZ_RUNTIME_IO_SORT_MB}
+ * would be initialized upfront.
+ */
+ @ConfigurationProperty(type = "boolean")
+ public static final String TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY = TEZ_RUNTIME_PREFIX +
+ "pipelined.sorter.lazy-allocate.memory";
+ public static final boolean
+ TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY_DEFAULT = false;
/**
* String value.
@@ -498,6 +523,9 @@ public class TezRuntimeConfiguration {
tezRuntimeKeys.add(TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES);
tezRuntimeKeys.add(TEZ_RUNTIME_COMBINE_MIN_SPILLS);
tezRuntimeKeys.add(TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS);
+ tezRuntimeKeys.add(
+ TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB);
+ tezRuntimeKeys.add(TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY);
tezRuntimeKeys.add(TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB);
tezRuntimeKeys.add(TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES);
tezRuntimeKeys.add(TEZ_RUNTIME_PARTITIONER_CLASS);
http://git-wip-us.apache.org/repos/asf/tez/blob/149a04c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index 81f5211..f512a5d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -25,9 +25,7 @@ import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.IntBuffer;
import java.util.ArrayList;
-import java.util.LinkedList;
import java.util.List;
-import java.util.ListIterator;
import java.util.PriorityQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -87,10 +85,6 @@ public class PipelinedSorter extends ExternalSorter {
private final ProxyComparator hasher;
// SortSpans
private SortSpan span;
- //Maintain a bunch of ByteBuffers (each of them can hold approximately 2 GB data)
- @VisibleForTesting
- protected final LinkedList<ByteBuffer> bufferList = new LinkedList<ByteBuffer>();
- private ListIterator<ByteBuffer> listIterator;
//total memory capacity allocated to sorter
private final long capacity;
@@ -98,9 +92,6 @@ public class PipelinedSorter extends ExternalSorter {
//track buffer overflow recursively in all buffers
private int bufferOverflowRecursion;
- private final int blockSize;
-
-
// Merger
private final SpanMerger merger;
private final ExecutorService sortmaster;
@@ -110,17 +101,44 @@ public class PipelinedSorter extends ExternalSorter {
private final boolean pipelinedShuffle;
+ private long currentAllocatableMemory;
+ //Maintain a list of ByteBuffers
+ @VisibleForTesting
+ final List<ByteBuffer> buffers;
+ final int maxNumberOfBlocks;
+ private int bufferIndex = -1;
+ private final int MIN_BLOCK_SIZE;
+ private final boolean lazyAllocateMem;
+
// TODO Set additional countesr - total bytes written, spills etc.
public PipelinedSorter(OutputContext outputContext, Configuration conf, int numOutputs,
long initialMemoryAvailable) throws IOException {
- this(outputContext,conf,numOutputs, initialMemoryAvailable, 0);
- }
-
- PipelinedSorter(OutputContext outputContext, Configuration conf, int numOutputs,
- long initialMemoryAvailable, int blkSize) throws IOException {
super(outputContext, conf, numOutputs, initialMemoryAvailable);
+ lazyAllocateMem = this.conf.getBoolean(TezRuntimeConfiguration
+ .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, TezRuntimeConfiguration
+ .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY_DEFAULT);
+
+ if (lazyAllocateMem) {
+ /**
+ * When lazy-allocation is enabled, framework takes care of auto
+ * allocating memory on need basis. Desirable block size is set to 256MB
+ */
+ MIN_BLOCK_SIZE = 256 << 20; //256 MB
+ } else {
+ int minBlockSize = conf.getInt(TezRuntimeConfiguration
+ .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB,
+ TezRuntimeConfiguration
+ .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB_DEFAULT);
+ Preconditions.checkArgument(
+ (minBlockSize > 0 && minBlockSize < 2047),
+ TezRuntimeConfiguration
+ .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB
+ + "=" + minBlockSize + " should be a positive value between 0 and 2047");
+ MIN_BLOCK_SIZE = minBlockSize << 20;
+ }
+
StringBuilder initialSetupLogLine = new StringBuilder("Setting up PipelinedSorter for ")
.append(outputContext.getDestinationVertexName()).append(": ");
partitionBits = bitcount(partitions)+1;
@@ -135,23 +153,7 @@ public class PipelinedSorter extends ExternalSorter {
final long sortmb = this.availableMemoryMb;
// buffers and accounting
- long maxMemUsage = sortmb << 20;
-
- this.blockSize = computeBlockSize(blkSize, maxMemUsage);
-
- long usage = sortmb << 20;
- //Divide total memory into different blocks.
- int numberOfBlocks = Math.max(1, (int) Math.ceil(1.0 * usage / blockSize));
- initialSetupLogLine.append("#blocks=").append(numberOfBlocks);
- initialSetupLogLine.append(", maxMemUsage=").append(maxMemUsage);
- initialSetupLogLine.append(", BLOCK_SIZE=").append(blockSize);
- initialSetupLogLine.append(", finalMergeEnabled=").append(isFinalMergeEnabled());
- initialSetupLogLine.append(", pipelinedShuffle=").append(pipelinedShuffle);
- initialSetupLogLine.append(", sendEmptyPartitions=").append(sendEmptyPartitionDetails);
- initialSetupLogLine.append(", ").append(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB).append(
- "=").append(
- sortmb);
-
+ long maxMemLimit = sortmb << 20;
initialSetupLogLine.append(", UsingHashComparator=");
// k/v serialization
@@ -166,20 +168,43 @@ public class PipelinedSorter extends ExternalSorter {
LOG.info(initialSetupLogLine.toString());
long totalCapacityWithoutMeta = 0;
- for (int i = 0; i < numberOfBlocks; i++) {
- Preconditions.checkArgument(usage > 0, "usage can't be less than zero " + usage);
- long size = Math.min(usage, blockSize);
+ long availableMem = maxMemLimit;
+ int numBlocks = 0;
+ while(availableMem > 0) {
+ long size = Math.min(availableMem, computeBlockSize(availableMem, maxMemLimit));
int sizeWithoutMeta = (int) ((size) - (size % METASIZE));
- bufferList.add(ByteBuffer.allocate(sizeWithoutMeta));
totalCapacityWithoutMeta += sizeWithoutMeta;
- usage -= size;
+ availableMem -= size;
+ numBlocks++;
}
+ currentAllocatableMemory = maxMemLimit;
+ maxNumberOfBlocks = numBlocks;
capacity = totalCapacityWithoutMeta;
- listIterator = bufferList.listIterator();
+ buffers = Lists.newArrayListWithCapacity(maxNumberOfBlocks);
+ allocateSpace(); //Allocate the first block
+ if (!lazyAllocateMem) {
+ LOG.info("Pre allocating rest of memory buffers upfront");
+ while(allocateSpace() != null);
+ }
- Preconditions.checkArgument(listIterator.hasNext(), "Buffer list seems to be empty " + bufferList.size());
- span = new SortSpan(listIterator.next(), 1024*1024, 16, this.comparator);
+ initialSetupLogLine.append("#blocks=").append(maxNumberOfBlocks);
+ initialSetupLogLine.append(", maxMemUsage=").append(maxMemLimit);
+ initialSetupLogLine.append(", lazyAllocateMem=").append(
+ lazyAllocateMem);
+ initialSetupLogLine.append(", minBlockSize=").append(MIN_BLOCK_SIZE);
+ initialSetupLogLine.append(", initial BLOCK_SIZE=").append(buffers.get(0).capacity());
+ initialSetupLogLine.append(", finalMergeEnabled=").append(isFinalMergeEnabled());
+ initialSetupLogLine.append(", pipelinedShuffle=").append(pipelinedShuffle);
+ initialSetupLogLine.append(", sendEmptyPartitions=").append(sendEmptyPartitionDetails);
+ initialSetupLogLine.append(", ").append(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB).append(
+ "=").append(
+ sortmb);
+
+ Preconditions.checkState(buffers.size() > 0, "Atleast one buffer needs to be present");
+ LOG.info(initialSetupLogLine.toString());
+
+ span = new SortSpan(buffers.get(bufferIndex), 1024 * 1024, 16, this.comparator);
merger = new SpanMerger(); // SpanIterators are comparable
final int sortThreads =
this.conf.getInt(
@@ -197,18 +222,67 @@ public class PipelinedSorter extends ExternalSorter {
minSpillsForCombine = this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINE_MIN_SPILLS, 3);
}
+ ByteBuffer allocateSpace() {
+ if (currentAllocatableMemory <= 0) {
+ //No space available.
+ return null;
+ }
+
+ int size = computeBlockSize(currentAllocatableMemory, availableMemoryMb << 20);
+ currentAllocatableMemory -= size;
+ int sizeWithoutMeta = (size) - (size % METASIZE);
+ ByteBuffer space = ByteBuffer.allocate(sizeWithoutMeta);
+
+ buffers.add(space);
+ bufferIndex++;
+
+ Preconditions.checkState(buffers.size() <= maxNumberOfBlocks,
+ "Number of blocks " + buffers.size()
+ + " is exceeding " + maxNumberOfBlocks);
+
+ LOG.info("Newly allocated block size=" + size
+ + ", index=" + bufferIndex
+ + ", Number of buffers=" + buffers.size()
+ + ", currentAllocatableMemory=" + currentAllocatableMemory
+ + ", currentBufferSize=" + space.capacity()
+ + ", total=" + (availableMemoryMb << 20));
+ return space;
+ }
+
+
@VisibleForTesting
- static int computeBlockSize(int blkSize, long maxMemUsage) {
- if (blkSize == 0) {
- return (int) Math.min(maxMemUsage, Integer.MAX_VALUE);
- } else {
- Preconditions.checkArgument(blkSize > 0, "blkSize should be between 1 and Integer.MAX_VALUE");
- if (blkSize >= maxMemUsage) {
- return (maxMemUsage > Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) maxMemUsage;
- } else {
- return blkSize;
+ int computeBlockSize(long availableMem, long maxAllocatedMemory) {
+ int maxBlockSize = 0;
+ /**
+ * When lazy-allocation is enabled, framework takes care of auto allocating
+ * memory on need basis. In such cases, first buffer starts with 32 MB.
+ */
+ if (lazyAllocateMem) {
+ if (buffers == null || buffers.isEmpty()) {
+ return 32 << 20; //32 MB
+ }
+ }
+
+ //Honor MIN_BLOCK_SIZE
+ maxBlockSize = Math.max(MIN_BLOCK_SIZE, maxBlockSize);
+
+ if (availableMem < maxBlockSize) {
+ maxBlockSize = (int) availableMem;
+ }
+
+ int maxMem = (maxAllocatedMemory > Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) maxAllocatedMemory;
+ if (maxBlockSize > maxMem) {
+ maxBlockSize = maxMem;
+ }
+
+ availableMem -= maxBlockSize;
+ if (availableMem < MIN_BLOCK_SIZE) {
+ if ((maxBlockSize + availableMem) < Integer.MAX_VALUE) {
+ //Merge remaining with last block
+ maxBlockSize += availableMem;
}
}
+ return maxBlockSize;
}
private int bitcount(int n) {
@@ -237,8 +311,8 @@ public class PipelinedSorter extends ExternalSorter {
if (pipelinedShuffle && ret) {
sendPipelinedShuffleEvents();
}
- //safe to reset the iterator
- listIterator = bufferList.listIterator();
+ //safe to reset bufferIndex to 0;
+ bufferIndex = 0;
int items = 1024*1024;
int perItem = 16;
if(span.length() != 0) {
@@ -250,9 +324,9 @@ public class PipelinedSorter extends ExternalSorter {
items = 1024*1024;
}
}
- Preconditions.checkArgument(listIterator.hasNext(), "block iterator should not be empty");
+ Preconditions.checkArgument(buffers.get(bufferIndex) != null, "block should not be empty");
//TODO: fix per item being passed.
- span = new SortSpan((ByteBuffer)listIterator.next().clear(), (1024*1024),
+ span = new SortSpan((ByteBuffer)buffers.get(bufferIndex).clear(), (1024*1024),
perItem, ConfigUtils.getIntermediateOutputKeyComparator(this.conf));
} else {
// queue up the sort
@@ -325,7 +399,7 @@ public class PipelinedSorter extends ExternalSorter {
// restore limit
span.kvbuffer.position(keystart);
this.sort();
- if (span.length() == 0 || bufferOverflowRecursion > bufferList.size()) {
+ if (span.length() == 0 || bufferOverflowRecursion > buffers.size()) {
// spill the current key value pair
spillSingleRecord(key, value, partition);
bufferOverflowRecursion = 0;
@@ -562,7 +636,7 @@ public class PipelinedSorter extends ExternalSorter {
sortmaster.shutdown();
//safe to clean up
- bufferList.clear();
+ buffers.clear();
if(indexCacheList.isEmpty()) {
@@ -911,11 +985,12 @@ public class PipelinedSorter extends ExternalSorter {
LOG.info(outputContext.getDestinationVertexName() + ": " + String.format("Span%d.length = %d, perItem = %d", index, length(), perItem));
if(remaining.remaining() < METASIZE+perItem) {
//Check if we can get the next Buffer from the main buffer list
- if (listIterator.hasNext()) {
+ ByteBuffer space = allocateSpace();
+ if (space != null) {
LOG.info(outputContext.getDestinationVertexName() + ": " + "Getting memory from next block in the list, recordsWritten=" +
mapOutputRecordCounter.getValue());
reinit = true;
- return listIterator.next();
+ return space;
}
return null;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/149a04c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
index 45b6713..c0b0760 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
@@ -225,6 +225,8 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput {
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINE_MIN_SPILLS);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS);
+ confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB);
+ confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_INTERNAL_SORTER_CLASS);
http://git-wip-us.apache.org/repos/asf/tez/blob/149a04c2/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
index 92163c4..2cebea4 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
@@ -64,7 +64,6 @@ import static org.mockito.internal.verification.VerificationModeFactory.times;
* limitations under the License.
*/
public class TestPipelinedSorter {
- private static Configuration conf = new Configuration();
private static FileSystem localFs = null;
private static Path workDir = null;
private OutputContext outputContext;
@@ -76,7 +75,7 @@ public class TestPipelinedSorter {
private static TreeMap<String, String> sortedDataMap = Maps.newTreeMap();
static {
- conf.set("fs.defaultFS", "file:///");
+ Configuration conf = getConf();
try {
localFs = FileSystem.getLocal(conf);
workDir = new Path(
@@ -99,7 +98,11 @@ public class TestPipelinedSorter {
TezCounters counters = new TezCounters();
String uniqueId = UUID.randomUUID().toString();
this.outputContext = createMockOutputContext(counters, appId, uniqueId);
+ }
+ public static Configuration getConf() {
+ Configuration conf = new Configuration();
+ conf.set("fs.defaultFS", "file:///");
//To enable PipelinedSorter
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_SORTER_CLASS, SorterImpl.PIPELINED.name());
@@ -110,15 +113,17 @@ public class TestPipelinedSorter {
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true);
//Setup localdirs
- String localDirs = workDir.toString();
- conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, localDirs);
+ if (workDir != null) {
+ String localDirs = workDir.toString();
+ conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, localDirs);
+ }
+ return conf;
}
@After
public void reset() throws IOException {
cleanup();
localFs.mkdirs(workDir);
- conf = new Configuration();
}
@Test
@@ -126,6 +131,7 @@ public class TestPipelinedSorter {
//TODO: need to support multiple partition testing later
//# partition, # of keys, size per key, InitialMem, blockSize
+ Configuration conf = getConf();
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 5);
basicTest(1, 100000, 100, (10 * 1024l * 1024l), 3 << 20);
@@ -133,6 +139,7 @@ public class TestPipelinedSorter {
@Test
public void testWithoutPartitionStats() throws IOException {
+ Configuration conf = getConf();
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS, false);
//# partition, # of keys, size per key, InitialMem, blockSize
basicTest(1, 0, 0, (10 * 1024l * 1024l), 3 << 20);
@@ -141,6 +148,7 @@ public class TestPipelinedSorter {
@Test
public void testWithEmptyData() throws IOException {
+ Configuration conf = getConf();
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 5);
//# partition, # of keys, size per key, InitialMem, blockSize
basicTest(1, 0, 0, (10 * 1024l * 1024l), 3 << 20);
@@ -150,9 +158,12 @@ public class TestPipelinedSorter {
public void testEmptyDataWithPipelinedShuffle() throws IOException {
this.numOutputs = 1;
this.initialAvailableMem = 1 *1024 * 1024;
+ Configuration conf = getConf();
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false);
+ conf.setInt(TezRuntimeConfiguration
+ .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 1);
PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs,
- initialAvailableMem, 1 << 20);
+ initialAvailableMem);
writeData(sorter, 0, 1<<20);
@@ -186,16 +197,19 @@ public class TestPipelinedSorter {
@Test
public void testKVExceedsBuffer2() throws IOException {
// a list of 4 blocks each 256kb, 2KV pair, key 1mb, value 1mb
- basicTest(1, 2, (1 << 20), (1 * 1024l * 1024l), 256<<10);
+ basicTest(1, 2, (1 << 20), (1 * 1024l * 1024l), 256<<20);
}
@Test
public void testExceedsKVWithMultiplePartitions() throws IOException {
+ Configuration conf = getConf();
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true);
this.numOutputs = 5;
this.initialAvailableMem = 1 * 1024 * 1024;
+ conf.setInt(TezRuntimeConfiguration
+ .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 1);
PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs,
- initialAvailableMem, 0);
+ initialAvailableMem);
writeData(sorter, 100, 1<<20);
verifyCounters(sorter, outputContext);
@@ -205,9 +219,12 @@ public class TestPipelinedSorter {
public void testExceedsKVWithPipelinedShuffle() throws IOException {
this.numOutputs = 1;
this.initialAvailableMem = 1 *1024 * 1024;
+ Configuration conf = getConf();
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false);
+ conf.setInt(TezRuntimeConfiguration
+ .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 1);
PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs,
- initialAvailableMem, 1 << 20);
+ initialAvailableMem);
writeData(sorter, 5, 1<<20);
@@ -222,9 +239,12 @@ public class TestPipelinedSorter {
public void test_TEZ_2602_50mb() throws IOException {
this.numOutputs = 1;
this.initialAvailableMem = 1 *1024 * 1024;
+ Configuration conf = getConf();
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true);
+ conf.setInt(TezRuntimeConfiguration
+ .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 1);
PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs,
- initialAvailableMem, 1 << 20);
+ initialAvailableMem);
Text value = new Text("1");
long size = 50 * 1024 * 1024;
@@ -238,13 +258,14 @@ public class TestPipelinedSorter {
sorter.close();
}
- @Test
+ //@Test
public void testLargeDataWithMixedKV() throws IOException {
this.numOutputs = 1;
this.initialAvailableMem = 48 *1024 * 1024;
+ Configuration conf = getConf();
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true);
PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs,
- initialAvailableMem, 0);
+ initialAvailableMem);
//write 10 MB KV
Text key = new Text(RandomStringUtils.randomAlphanumeric(10 << 20));
@@ -293,13 +314,15 @@ public class TestPipelinedSorter {
// 20 KVpairs of 2X10kb, 10 KV of 2X200kb, 20KV of 2X10kb
int numkeys[] = {20, 10, 20};
int keylens[] = {10<<10, 200<<10, 10<<10};
- basicTest2(1, numkeys, keylens, (1 * 1024l * 1024l), 1 << 18);
+ basicTest2(1, numkeys, keylens, (10 * 1024l * 1024l), 2);
}
@Test
public void testWithCustomComparator() throws IOException {
//Test with custom comparator
- conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS, CustomComparator.class.getName());
+ Configuration conf = getConf();
+ conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS,
+ CustomComparator.class.getName());
basicTest(1, 100000, 100, (10 * 1024l * 1024l), 3 << 20);
}
@@ -307,10 +330,13 @@ public class TestPipelinedSorter {
public void testWithPipelinedShuffle() throws IOException {
this.numOutputs = 1;
this.initialAvailableMem = 5 *1024 * 1024;
+ Configuration conf = getConf();
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 5);
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false);
+ conf.setInt(TezRuntimeConfiguration
+ .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 1);
PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs,
- initialAvailableMem, 1 << 20);
+ initialAvailableMem);
//Write 100 keys each of size 10
writeData(sorter, 10000, 100);
@@ -323,11 +349,14 @@ public class TestPipelinedSorter {
@Test
public void testCountersWithMultiplePartitions() throws IOException {
+ Configuration conf = getConf();
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true);
this.numOutputs = 5;
this.initialAvailableMem = 5 * 1024 * 1024;
+ conf.setInt(TezRuntimeConfiguration
+ .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 1);
PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs,
- initialAvailableMem, 0);
+ initialAvailableMem);
writeData(sorter, 10000, 100);
verifyCounters(sorter, outputContext);
@@ -336,8 +365,11 @@ public class TestPipelinedSorter {
public void basicTest2(int partitions, int[] numkeys, int[] keysize,
long initialAvailableMem, int blockSize) throws IOException {
this.numOutputs = partitions; // single output
+ Configuration conf = getConf();
+ conf.setInt(TezRuntimeConfiguration
+ .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 100);
PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs,
- initialAvailableMem, blockSize);
+ initialAvailableMem);
writeData2(sorter, numkeys, keysize);
verifyCounters(sorter, outputContext);
}
@@ -360,10 +392,13 @@ public class TestPipelinedSorter {
}
public void basicTest(int partitions, int numKeys, int keySize,
- long initialAvailableMem, int blockSize) throws IOException {
+ long initialAvailableMem, int minBlockSize) throws IOException {
this.numOutputs = partitions; // single output
+ Configuration conf = getConf();
+ conf.setInt(TezRuntimeConfiguration
+ .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, minBlockSize >> 20);
PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs,
- initialAvailableMem, blockSize);
+ initialAvailableMem);
writeData(sorter, numKeys, keySize);
@@ -420,50 +455,250 @@ public class TestPipelinedSorter {
@Test
+ //Intentionally not having timeout
//Its not possible to allocate > 2 GB in test environment. Carry out basic checks here.
public void memTest() throws IOException {
//Verify if > 2 GB can be set via config
+ Configuration conf = getConf();
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 3076);
long size = ExternalSorter.getInitialMemoryRequirement(conf, 4096 * 1024 * 1024l);
Assert.assertTrue(size == (3076l << 20));
//Verify number of block buffers allocated
this.initialAvailableMem = 10 * 1024 * 1024;
+ conf.setInt(TezRuntimeConfiguration
+ .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 1);
PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs,
- initialAvailableMem, 1 << 20);
- Assert.assertTrue(sorter.bufferList.size() == 10);
+ initialAvailableMem);
+ Assert.assertTrue(sorter.maxNumberOfBlocks == 10);
+ //10 MB available, request for 3 MB chunk. Last 1 MB gets added to previous chunk.
+ conf.setInt(TezRuntimeConfiguration
+ .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 3);
sorter = new PipelinedSorter(this.outputContext, conf, numOutputs,
- initialAvailableMem, 3 << 20);
- Assert.assertTrue(sorter.bufferList.size() == 4);
+ initialAvailableMem);
+ Assert.assertTrue(sorter.maxNumberOfBlocks == 3);
+ //10 MB available, request for 10 MB min chunk. Would get 1 block.
+ conf.setInt(TezRuntimeConfiguration
+ .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 10);
sorter = new PipelinedSorter(this.outputContext, conf, numOutputs,
- initialAvailableMem, 10 << 20);
- Assert.assertTrue(sorter.bufferList.size() == 1);
+ initialAvailableMem);
+ Assert.assertTrue(sorter.maxNumberOfBlocks == 1);
+
+ //Verify block sizes (10 MB min chunk size), but available mem is zero.
+ conf.setInt(TezRuntimeConfiguration
+ .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 10);
+ sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, initialAvailableMem);
+ Assert.assertTrue(sorter.maxNumberOfBlocks == 1);
+ int blockSize = sorter.computeBlockSize(0, (10 << 20));
+ //available is zero. Can't allocate any more buffer.
+ Assert.assertTrue(blockSize == 0);
+
+ //300 MB available. Request for 200 MB min block size. It would allocate a block with 200 MB,
+ // but last 100 would get clubbed. Hence, it would return 300 MB block.
+ conf.setInt(TezRuntimeConfiguration
+ .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 200);
+ sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, (300 << 20));
+ Assert.assertTrue(sorter.maxNumberOfBlocks == 1);
+ blockSize = sorter.computeBlockSize((300 << 20), (300 << 20));
+ Assert.assertTrue(blockSize == (300 << 20));
+
+ //300 MB available. Request for 3500 MB min block size. throw exception
+ conf.setInt(TezRuntimeConfiguration
+ .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 3500);
+ try {
+ sorter = new PipelinedSorter(this.outputContext, conf, numOutputs,
+ (300 << 20));
+ } catch(IllegalArgumentException iae ) {
+ assertTrue(iae.getMessage().contains("positive value between 0 and 2047"));
+ }
- //Verify block sizes
- int blockSize = PipelinedSorter.computeBlockSize(0, (10 * 1024 * 1024));
- //initialAvailableMem is < 2 GB. So consider it as the blockSize
- Assert.assertTrue(blockSize == (10 * 1024 * 1024));
+ //64 MB available. Request for 32 MB min block size.
+ conf.setBoolean(TezRuntimeConfiguration
+ .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, false);
+ conf.setInt(TezRuntimeConfiguration
+ .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 32);
+ sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, (64 << 20));
+ Assert.assertTrue(sorter.maxNumberOfBlocks == 2);
+ blockSize = sorter.computeBlockSize((64 << 20), (64 << 20));
+ Assert.assertTrue(blockSize == (32 << 20));
+
+ blockSize = sorter.computeBlockSize((32 << 20), (64 << 20));
+ Assert.assertTrue(blockSize == (32 << 20));
+
+ blockSize = sorter.computeBlockSize((48 << 20), (64 << 20));
+ Assert.assertTrue(blockSize == (48 << 20));
+
+ //64 MB available. Request for 8 MB min block size.
+ conf.setInt(TezRuntimeConfiguration
+ .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 8);
+ sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, (64 << 20));
+ Assert.assertTrue(sorter.maxNumberOfBlocks == 8);
+ blockSize = sorter.computeBlockSize((64 << 20), (64 << 20));
+ //Should return 16 instead of 8 which is min block size.
+ Assert.assertTrue(blockSize == (8 << 20));
+ }
- blockSize = PipelinedSorter.computeBlockSize(0, (10 * 1024 * 1024 * 1024l));
- //initialAvailableMem is > 2 GB. Restrict block size to Integer.MAX_VALUE;
- Assert.assertTrue(blockSize == Integer.MAX_VALUE);
+ @Test
+ //Intentionally not having timeout
+ public void test_without_lazyMemAllocation() throws IOException {
+ this.numOutputs = 10;
+ Configuration conf = getConf();
+
+ //128 MB. Pre-allocate. Request for default block size. Get 1 buffer
+ conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 128);
+ conf.setInt(TezRuntimeConfiguration
+ .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB,
+ TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB_DEFAULT);
+ conf.setBoolean(TezRuntimeConfiguration
+ .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, false);
+ PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf,
+ numOutputs, (128l << 20));
+ assertTrue("Expected 1 sort buffers. current len=" + sorter.buffers.size(),
+ sorter.buffers.size() == 1);
+
+ //128 MB. Pre-allocate. Get 2 buffer
+ conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 128);
+ conf.setInt(TezRuntimeConfiguration
+ .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 62);
+ conf.setBoolean(TezRuntimeConfiguration
+ .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, false);
+ sorter = new PipelinedSorter(this.outputContext, conf,
+ numOutputs, (128l << 20));
+ assertTrue("Expected 2 sort buffers. current len=" + sorter.buffers.size(),
+ sorter.buffers.size() == 2);
+
+ //48 MB. Pre-allocate. But request for lesser block size (62). Get 2 buffer
+ conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 48);
+ conf.setInt(TezRuntimeConfiguration
+ .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 62);
+ conf.setBoolean(TezRuntimeConfiguration
+ .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, false);
+ sorter = new PipelinedSorter(this.outputContext, conf,
+ numOutputs, (48l << 20));
+ assertTrue("Expected 1 sort buffers. current len=" + sorter.buffers.size(),
+ sorter.buffers.size() == 1);
+ }
+
+ @Test
+ //Intentionally not having timeout
+ public void test_with_lazyMemAllocation() throws IOException {
+ this.numOutputs = 10;
+ Configuration conf = getConf();
+
+ //128 MB. Do not pre-allocate.
+ // Get 32 MB buffer first and the another buffer with 96 on filling up
+ // the 32 MB buffer.
+ conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 128);
+ conf.setBoolean(TezRuntimeConfiguration
+ .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, true);
+ PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf,
+ numOutputs, (128l << 20));
+ assertTrue("Expected 1 sort buffers. current len=" + sorter.buffers.size(),
+ sorter.buffers.size() == 1);
+ assertTrue(sorter.buffers.get(0).capacity() == 32 * 1024 * 1024);
+ writeData(sorter, 100, 1024*1024, false); //100 1 MB KV. Will spill
+
+ //Now it should have created 2 buffers, 32 & 96 MB buffers.
+ assertTrue(sorter.buffers.size() == 2);
+ assertTrue(sorter.buffers.get(0).capacity() == 32 * 1024 * 1024);
+ assertTrue(sorter.buffers.get(1).capacity() == 96 * 1024 * 1024);
+ closeSorter(sorter);
+ verifyCounters(sorter, outputContext);
- blockSize = PipelinedSorter.computeBlockSize((1*1024*1024*1024), (10 * 1024 * 1024));
- //sort buffer is 10 MB. But block size requested is 1 GB. Restrict block size to 10 MB.
- Assert.assertTrue(blockSize == (10 * 1024 * 1024));
+ //TODO: Not sure if this would fail in build machines due to mem
+ //300 MB. Do not pre-allocate.
+ // Get 1 buffer with 62 MB. But grow to 2 buffers as data is written
+ conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 300);
+ conf.setBoolean(TezRuntimeConfiguration
+ .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, true);
+ sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, (300l << 20));
+ assertTrue(sorter.buffers.size() == 1);
+ assertTrue(sorter.buffers.get(0).capacity() == 32 * 1024 * 1024);
+
+ writeData(sorter, 50, 1024*1024, false); //50 1 MB KV to allocate 2nd buf
+ assertTrue(sorter.buffers.size() == 2);
+ assertTrue(sorter.buffers.get(0).capacity() == 32 * 1024 * 1024);
+ assertTrue(sorter.buffers.get(1).capacity() == 268 * 1024 * 1024);
+
+ //48 MB. Do not pre-allocate.
+ // Get 32 MB buffer first invariably and proceed with the rest.
+ conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 48);
+ conf.setBoolean(TezRuntimeConfiguration
+ .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, true);
+ sorter = new PipelinedSorter(this.outputContext, conf,
+ numOutputs, (48l << 20));
+ assertTrue("Expected 1 sort buffers. current len=" + sorter.buffers.size(),
+ sorter.buffers.size() == 1);
+ assertTrue(sorter.buffers.get(0).capacity() == 32 * 1024 * 1024);
+ writeData(sorter, 20, 1024*1024, false); //100 1 MB KV. Will spill
+
+ //Now it should have created 2 buffers, 32 & 96 MB buffers.
+ assertTrue(sorter.buffers.size() == 2);
+ assertTrue(sorter.buffers.get(0).capacity() == 32 * 1024 * 1024);
+ assertTrue(sorter.buffers.get(1).capacity() == 16 * 1024 * 1024);
+ closeSorter(sorter);
+ }
+ @Test
+ //Intentionally not having timeout
+ public void testLazyAllocateMem() throws IOException {
+ this.numOutputs = 10;
+ Configuration conf = getConf();
+ conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 128);
+ conf.setBoolean(TezRuntimeConfiguration
+ .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, false);
+ conf.setInt(TezRuntimeConfiguration
+ .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 4500);
try {
- blockSize = PipelinedSorter.computeBlockSize(-1, (10 * 1024 * 1024 * 1024l));
- //block size can't set to -1
- fail();
- } catch(IllegalArgumentException e) {
- Assert.assertTrue(e.getMessage().contains("should be between 1 and Integer.MAX_VALUE"));
+ PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf,
+ numOutputs, (4500l << 20));
+ } catch (IllegalArgumentException iae) {
+ assertTrue(iae.getMessage().contains(TezRuntimeConfiguration
+ .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB));
+ assertTrue(iae.getMessage().contains("value between 0 and 2047"));
}
+
+ conf.setInt(TezRuntimeConfiguration
+ .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, -1);
+ try {
+ PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf,
+ numOutputs, (4500l << 20));
+ } catch (IllegalArgumentException iae) {
+ assertTrue(iae.getMessage().contains(TezRuntimeConfiguration
+ .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB));
+ assertTrue(iae.getMessage().contains("value between 0 and 2047"));
+ }
+
+ conf.setBoolean(TezRuntimeConfiguration
+ .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, true);
+ conf.setInt(TezRuntimeConfiguration
+ .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, -1);
+ try {
+ PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf,
+ numOutputs, (4500l << 20));
+ } catch (IllegalArgumentException iae) {
+ assertTrue(iae.getMessage().contains(TezRuntimeConfiguration
+ .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB));
+ assertTrue(iae.getMessage().contains("value between 0 and 2047"));
+ }
+
+ }
+
+ @Test
+ //Intentionally not having timeout
+ public void testWithLargeKeyValueWithMinBlockSize() throws IOException {
+ //2 MB key & 2 MB value, 48 MB sort buffer. block size is 16MB
+ basicTest(1, 5, (2 << 20), (48 * 1024l * 1024l), 16 << 20);
}
private void writeData(ExternalSorter sorter, int numKeys, int keyLen) throws IOException {
+ writeData(sorter, numKeys, keyLen, true);
+ }
+
+ private void writeData(ExternalSorter sorter, int numKeys, int keyLen,
+ boolean autoClose) throws IOException {
sortedDataMap.clear();
for (int i = 0; i < numKeys; i++) {
Text key = new Text(RandomStringUtils.randomAlphanumeric(keyLen));
@@ -471,8 +706,16 @@ public class TestPipelinedSorter {
sorter.write(key, value);
sortedDataMap.put(key.toString(), value.toString()); //for verifying data later
}
- sorter.flush();
- sorter.close();
+ if (autoClose) {
+ closeSorter(sorter);
+ }
+ }
+
+ private void closeSorter(ExternalSorter sorter) throws IOException {
+ if (sorter != null) {
+ sorter.flush();
+ sorter.close();
+ }
}
private void verifyData(IFile.Reader reader)
@@ -481,6 +724,7 @@ public class TestPipelinedSorter {
Text readValue = new Text();
DataInputBuffer keyIn = new DataInputBuffer();
DataInputBuffer valIn = new DataInputBuffer();
+ Configuration conf = getConf();
SerializationFactory serializationFactory = new SerializationFactory(conf);
Deserializer<Text> keyDeserializer = serializationFactory.getDeserializer(Text.class);
Deserializer<Text> valDeserializer = serializationFactory.getDeserializer(Text.class);
http://git-wip-us.apache.org/repos/asf/tez/blob/149a04c2/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java
index f57731c..fabf52d 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java
@@ -21,6 +21,7 @@
package org.apache.tez.runtime.library.conf;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -122,6 +123,10 @@ public class TestOrderedPartitionedKVEdgeConfig {
fromConf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT, 0.11f);
fromConf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 123);
fromConf.set("io.shouldExist", "io");
+ fromConf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY,
+ true);
+ fromConf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB,
+ 1000);
Map<String, String> additionalConfs = new HashMap<String, String>();
additionalConfs.put("test.key.2", "key2");
additionalConfs.put(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, "1111");
@@ -151,6 +156,10 @@ public class TestOrderedPartitionedKVEdgeConfig {
Configuration outputConf = rebuiltOutput.conf;
Configuration inputConf = rebuiltInput.conf;
+ assertTrue(outputConf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY,
+ TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY_DEFAULT));
+ assertEquals(1000, outputConf.getInt(TezRuntimeConfiguration
+ .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 2000));
assertEquals(3, outputConf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR, 0));
assertEquals(1111, outputConf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, 0));
assertEquals(2222, outputConf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE, 0));