You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by yi...@apache.org on 2022/07/22 03:31:41 UTC
[flink] branch master updated: [FLINK-28377][network] Decrease the memory size per request for sort-shuffle data read from 8M to 4M
This is an automated email from the ASF dual-hosted git repository.
yingjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new cfc5d724217 [FLINK-28377][network] Decrease the memory size per request for sort-shuffle data read from 8M to 4M
cfc5d724217 is described below
commit cfc5d724217cda4140d7efda552e83b8ba36ffb6
Author: Tan Yuxin <ta...@gmail.com>
AuthorDate: Fri Jul 22 11:31:35 2022 +0800
[FLINK-28377][network] Decrease the memory size per request for sort-shuffle data read from 8M to 4M
This closes #20325.
---
.../apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java | 4 ++--
.../network/partition/SortMergeResultPartitionReadScheduler.java | 4 ++--
.../flink/runtime/io/disk/BatchShuffleReadBufferPoolTest.java | 7 ++++---
3 files changed, 8 insertions(+), 7 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java
index 0149ea9e763..e17a460f4bf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java
@@ -51,10 +51,10 @@ public class BatchShuffleReadBufferPool {
private static final Logger LOG = LoggerFactory.getLogger(BatchShuffleReadBufferPool.class);
/**
- * Memory size in bytes can be allocated from this buffer pool for a single request (8M is for
+ * Memory size in bytes can be allocated from this buffer pool for a single request (4M is for
* better sequential read).
*/
- private static final int NUM_BYTES_PER_REQUEST = 8 * 1024 * 1024;
+ private static final int NUM_BYTES_PER_REQUEST = 4 * 1024 * 1024;
/**
* Wait for at most 2 seconds before return if there is no enough available buffers currently.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
index bb651aad89e..9785663095a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
@@ -143,10 +143,10 @@ class SortMergeResultPartitionReadScheduler implements Runnable, BufferRecycler
this.bufferPool = checkNotNull(bufferPool);
this.ioExecutor = checkNotNull(ioExecutor);
// one partition reader can consume at most Math.max(16M, numSubpartitions) (the expected
- // buffers per request is 8M) buffers for data read, which means larger parallelism, more
+ // buffers per request is 4M) buffers for data read, which means larger parallelism, more
// buffers. Currently, it is only an empirical strategy which can not be configured.
this.maxRequestedBuffers =
- Math.max(2 * bufferPool.getNumBuffersPerRequest(), numSubpartitions);
+ Math.max(4 * bufferPool.getNumBuffersPerRequest(), numSubpartitions);
this.bufferRequestTimeout = checkNotNull(bufferRequestTimeout);
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPoolTest.java
index 6a3654b944d..116fb5473bc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPoolTest.java
@@ -134,8 +134,9 @@ public class BatchShuffleReadBufferPoolTest {
Map<Object, List<MemorySegment>> buffers = new ConcurrentHashMap<>();
try {
- Object[] owners = new Object[] {new Object(), new Object(), new Object(), new Object()};
- for (int i = 0; i < 4; ++i) {
+ Object[] owners = new Object[8];
+ for (int i = 0; i < 8; ++i) {
+ owners[i] = new Object();
buffers.put(owners[i], bufferPool.requestBuffers());
}
assertEquals(0, bufferPool.getAvailableBuffers());
@@ -173,7 +174,7 @@ public class BatchShuffleReadBufferPoolTest {
assertNull(exception.get());
assertEquals(0, bufferPool.getAvailableBuffers());
- assertEquals(4, buffers.size());
+ assertEquals(8, buffers.size());
} finally {
for (Object owner : buffers.keySet()) {
bufferPool.recycle(buffers.remove(owner));