You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by yi...@apache.org on 2022/07/22 03:31:41 UTC

[flink] branch master updated: [FLINK-28377][network] Decrease the memory size per request for sort-shuffle data read from 8M to 4M

This is an automated email from the ASF dual-hosted git repository.

yingjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new cfc5d724217 [FLINK-28377][network] Decrease the memory size per request for sort-shuffle data read from 8M to 4M
cfc5d724217 is described below

commit cfc5d724217cda4140d7efda552e83b8ba36ffb6
Author: Tan Yuxin <ta...@gmail.com>
AuthorDate: Fri Jul 22 11:31:35 2022 +0800

    [FLINK-28377][network] Decrease the memory size per request for sort-shuffle data read from 8M to 4M
    
    This closes #20325.
---
 .../apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java   | 4 ++--
 .../network/partition/SortMergeResultPartitionReadScheduler.java   | 4 ++--
 .../flink/runtime/io/disk/BatchShuffleReadBufferPoolTest.java      | 7 ++++---
 3 files changed, 8 insertions(+), 7 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java
index 0149ea9e763..e17a460f4bf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java
@@ -51,10 +51,10 @@ public class BatchShuffleReadBufferPool {
     private static final Logger LOG = LoggerFactory.getLogger(BatchShuffleReadBufferPool.class);
 
     /**
-     * Memory size in bytes can be allocated from this buffer pool for a single request (8M is for
+     * Memory size in bytes can be allocated from this buffer pool for a single request (4M is for
      * better sequential read).
      */
-    private static final int NUM_BYTES_PER_REQUEST = 8 * 1024 * 1024;
+    private static final int NUM_BYTES_PER_REQUEST = 4 * 1024 * 1024;
 
     /**
      * Wait for at most 2 seconds before return if there is no enough available buffers currently.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
index bb651aad89e..9785663095a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
@@ -143,10 +143,10 @@ class SortMergeResultPartitionReadScheduler implements Runnable, BufferRecycler
         this.bufferPool = checkNotNull(bufferPool);
         this.ioExecutor = checkNotNull(ioExecutor);
         // one partition reader can consume at most Math.max(16M, numSubpartitions) (the expected
-        // buffers per request is 8M) buffers for data read, which means larger parallelism, more
+        // buffers per request is 4M) buffers for data read, which means larger parallelism, more
         // buffers. Currently, it is only an empirical strategy which can not be configured.
         this.maxRequestedBuffers =
-                Math.max(2 * bufferPool.getNumBuffersPerRequest(), numSubpartitions);
+                Math.max(4 * bufferPool.getNumBuffersPerRequest(), numSubpartitions);
         this.bufferRequestTimeout = checkNotNull(bufferRequestTimeout);
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPoolTest.java
index 6a3654b944d..116fb5473bc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPoolTest.java
@@ -134,8 +134,9 @@ public class BatchShuffleReadBufferPoolTest {
         Map<Object, List<MemorySegment>> buffers = new ConcurrentHashMap<>();
 
         try {
-            Object[] owners = new Object[] {new Object(), new Object(), new Object(), new Object()};
-            for (int i = 0; i < 4; ++i) {
+            Object[] owners = new Object[8];
+            for (int i = 0; i < 8; ++i) {
+                owners[i] = new Object();
                 buffers.put(owners[i], bufferPool.requestBuffers());
             }
             assertEquals(0, bufferPool.getAvailableBuffers());
@@ -173,7 +174,7 @@ public class BatchShuffleReadBufferPoolTest {
 
             assertNull(exception.get());
             assertEquals(0, bufferPool.getAvailableBuffers());
-            assertEquals(4, buffers.size());
+            assertEquals(8, buffers.size());
         } finally {
             for (Object owner : buffers.keySet()) {
                 bufferPool.recycle(buffers.remove(owner));