You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ha...@apache.org on 2020/08/05 23:53:29 UTC

[tez] branch master updated: TEZ-4208 : Pipelinesorter uses single SortSpan after spill (Rajesh Balamohan via Ashutosh Chauhan)

This is an automated email from the ASF dual-hosted git repository.

hashutosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ce961b  TEZ-4208 : Pipelinesorter uses single SortSpan after spill (Rajesh Balamohan via Ashutosh Chauhan)
3ce961b is described below

commit 3ce961bc732322140db4d6120b309134958cac7b
Author: Rajesh Balamohan <rb...@apache.org>
AuthorDate: Wed Aug 5 16:52:46 2020 -0700

    TEZ-4208 : Pipelinesorter uses single SortSpan after spill (Rajesh Balamohan via Ashutosh Chauhan)
    
    Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
---
 .../library/common/sort/impl/PipelinedSorter.java  | 11 +++++---
 .../common/sort/impl/TestPipelinedSorter.java      | 29 ++++++++++++++++++++--
 2 files changed, 35 insertions(+), 5 deletions(-)

diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index bc68a5f..2ace875 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -116,6 +116,8 @@ public class PipelinedSorter extends ExternalSorter {
   //Maintain a list of ByteBuffers
   @VisibleForTesting
   final List<ByteBuffer> buffers;
+  @VisibleForTesting
+  List<Integer> bufferUsage;
   final int maxNumberOfBlocks;
   private int bufferIndex = -1;
   private final int MIN_BLOCK_SIZE;
@@ -202,6 +204,7 @@ public class PipelinedSorter extends ExternalSorter {
     capacity = totalCapacityWithoutMeta;
 
     buffers = Lists.newArrayListWithCapacity(maxNumberOfBlocks);
+    bufferUsage = Lists.newArrayListWithCapacity(maxNumberOfBlocks);
     allocateSpace(); //Allocate the first block
     if (!lazyAllocateMem) {
       LOG.info("Pre allocating rest of memory buffers upfront");
@@ -257,6 +260,7 @@ public class PipelinedSorter extends ExternalSorter {
 
     buffers.add(space);
     bufferIndex++;
+    bufferUsage.add(0);
 
     Preconditions.checkState(buffers.size() <= maxNumberOfBlocks,
         "Number of blocks " + buffers.size()
@@ -337,8 +341,9 @@ public class PipelinedSorter extends ExternalSorter {
       if (pipelinedShuffle && ret) {
         sendPipelinedShuffleEvents();
       }
-      //safe to reset bufferIndex to 0;
-      bufferIndex = 0;
+      // Use the next buffer
+      bufferIndex = (bufferIndex + 1) % buffers.size();
+      bufferUsage.set(bufferIndex, bufferUsage.get(bufferIndex) + 1);
       int items = 1024*1024;
       int perItem = 16;
       if(span.length() != 0) {
@@ -1186,7 +1191,7 @@ public class PipelinedSorter extends ExternalSorter {
     public int compareTo(SpanIterator other) {
       return span.compareInternal(other.getKey(), other.getPartition(), kvindex);
     }
-    
+
     @Override
     public String toString() {
       return String.format("SpanIterator<%d:%d> (span=%s)", kvindex, maxindex, span.toString());
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
index bd7f585..eed423b 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
@@ -47,7 +47,6 @@ import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration.ReportPartitionStats;
 import org.apache.tez.runtime.library.common.Constants;
 import org.apache.tez.runtime.library.common.combine.Combiner;
-import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.apache.tez.runtime.library.conf.OrderedPartitionedKVOutputConfig.SorterImpl;
 import org.apache.tez.runtime.library.partitioner.HashPartitioner;
 import org.apache.tez.runtime.library.testutils.RandomTextGenerator;
@@ -67,7 +66,6 @@ import java.util.UUID;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyListOf;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -499,6 +497,33 @@ public class TestPipelinedSorter {
     verifyOutputPermissions(outputContext.getUniqueIdentifier());
   }
 
+  @Test
+  /**
+   * Verify whether all buffers are used evenly in sorter.
+   */
+  public void basicTestForBufferUsage() throws IOException {
+    this.numOutputs = 1;
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, true);
+
+    PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, (100 << 20));
+    Assert.assertTrue(sorter.maxNumberOfBlocks >= 2);
+
+    // Start filling in with data 1MB Key, 1MB Val.
+    for (int i = 0; i < 200; i++) {
+      writeData(sorter, 1, 1024 * 1024, false);
+    }
+
+    // Check if all buffers are evenly used
+    int avg = (int) sorter.bufferUsage.stream().mapToDouble(d -> d).average().orElse(0.0);
+
+    for(int i = 0; i< sorter.bufferUsage.size(); i++) {
+      int usage = sorter.bufferUsage.get(i);
+      Assert.assertTrue("Buffer index " + i + " is not used correctly. "
+              + " usage: " + usage + ", avg: " + avg, usage >= avg);
+    }
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, false);
+  }
+
   public void basicTest2(int partitions, int[] numkeys, int[] keysize,
       long initialAvailableMem, int  blockSize) throws IOException {
     this.numOutputs = partitions; // single output