You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ha...@apache.org on 2020/08/05 23:53:29 UTC
[tez] branch master updated: TEZ-4208 : Pipelinesorter uses single
SortSpan after spill (Rajesh Balamohan via Ashutosh Chauhan)
This is an automated email from the ASF dual-hosted git repository.
hashutosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/master by this push:
new 3ce961b TEZ-4208 : Pipelinesorter uses single SortSpan after spill (Rajesh Balamohan via Ashutosh Chauhan)
3ce961b is described below
commit 3ce961bc732322140db4d6120b309134958cac7b
Author: Rajesh Balamohan <rb...@apache.org>
AuthorDate: Wed Aug 5 16:52:46 2020 -0700
TEZ-4208 : Pipelinesorter uses single SortSpan after spill (Rajesh Balamohan via Ashutosh Chauhan)
Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
---
.../library/common/sort/impl/PipelinedSorter.java | 11 +++++---
.../common/sort/impl/TestPipelinedSorter.java | 29 ++++++++++++++++++++--
2 files changed, 35 insertions(+), 5 deletions(-)
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index bc68a5f..2ace875 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -116,6 +116,8 @@ public class PipelinedSorter extends ExternalSorter {
//Maintain a list of ByteBuffers
@VisibleForTesting
final List<ByteBuffer> buffers;
+ @VisibleForTesting
+ List<Integer> bufferUsage;
final int maxNumberOfBlocks;
private int bufferIndex = -1;
private final int MIN_BLOCK_SIZE;
@@ -202,6 +204,7 @@ public class PipelinedSorter extends ExternalSorter {
capacity = totalCapacityWithoutMeta;
buffers = Lists.newArrayListWithCapacity(maxNumberOfBlocks);
+ bufferUsage = Lists.newArrayListWithCapacity(maxNumberOfBlocks);
allocateSpace(); //Allocate the first block
if (!lazyAllocateMem) {
LOG.info("Pre allocating rest of memory buffers upfront");
@@ -257,6 +260,7 @@ public class PipelinedSorter extends ExternalSorter {
buffers.add(space);
bufferIndex++;
+ bufferUsage.add(0);
Preconditions.checkState(buffers.size() <= maxNumberOfBlocks,
"Number of blocks " + buffers.size()
@@ -337,8 +341,9 @@ public class PipelinedSorter extends ExternalSorter {
if (pipelinedShuffle && ret) {
sendPipelinedShuffleEvents();
}
- //safe to reset bufferIndex to 0;
- bufferIndex = 0;
+ // Use the next buffer
+ bufferIndex = (bufferIndex + 1) % buffers.size();
+ bufferUsage.set(bufferIndex, bufferUsage.get(bufferIndex) + 1);
int items = 1024*1024;
int perItem = 16;
if(span.length() != 0) {
@@ -1186,7 +1191,7 @@ public class PipelinedSorter extends ExternalSorter {
public int compareTo(SpanIterator other) {
return span.compareInternal(other.getKey(), other.getPartition(), kvindex);
}
-
+
@Override
public String toString() {
return String.format("SpanIterator<%d:%d> (span=%s)", kvindex, maxindex, span.toString());
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
index bd7f585..eed423b 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
@@ -47,7 +47,6 @@ import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration.ReportPartitionStats;
import org.apache.tez.runtime.library.common.Constants;
import org.apache.tez.runtime.library.common.combine.Combiner;
-import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.conf.OrderedPartitionedKVOutputConfig.SorterImpl;
import org.apache.tez.runtime.library.partitioner.HashPartitioner;
import org.apache.tez.runtime.library.testutils.RandomTextGenerator;
@@ -67,7 +66,6 @@ import java.util.UUID;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyListOf;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -499,6 +497,33 @@ public class TestPipelinedSorter {
verifyOutputPermissions(outputContext.getUniqueIdentifier());
}
+ @Test
+ /**
+ * Verify whether all buffers are used evenly in sorter.
+ */
+ public void basicTestForBufferUsage() throws IOException {
+ this.numOutputs = 1;
+ conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, true);
+
+ PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, (100 << 20));
+ Assert.assertTrue(sorter.maxNumberOfBlocks >= 2);
+
+ // Start filling in with data 1MB Key, 1MB Val.
+ for (int i = 0; i < 200; i++) {
+ writeData(sorter, 1, 1024 * 1024, false);
+ }
+
+ // Check if all buffers are evenly used
+ int avg = (int) sorter.bufferUsage.stream().mapToDouble(d -> d).average().orElse(0.0);
+
+ for(int i = 0; i< sorter.bufferUsage.size(); i++) {
+ int usage = sorter.bufferUsage.get(i);
+ Assert.assertTrue("Buffer index " + i + " is not used correctly. "
+ + " usage: " + usage + ", avg: " + avg, usage >= avg);
+ }
+ conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, false);
+ }
+
public void basicTest2(int partitions, int[] numkeys, int[] keysize,
long initialAvailableMem, int blockSize) throws IOException {
this.numOutputs = partitions; // single output