You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by yi...@apache.org on 2022/08/08 07:24:56 UTC
[flink] branch master updated: [FLINK-28823][network] Consider the actual number of subpartition readers when requesting buffers for SortMergeResultPartitionReadScheduler
This is an automated email from the ASF dual-hosted git repository.
yingjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new ca45a28205b [FLINK-28823][network] Consider the actual number of subpartition readers when requesting buffers for SortMergeResultPartitionReadScheduler
ca45a28205b is described below
commit ca45a28205b424c2c77a21366fb29a67457672ff
Author: Yuxin Tan <ta...@gmail.com>
AuthorDate: Mon Aug 8 15:24:48 2022 +0800
[FLINK-28823][network] Consider the actual number of subpartition readers when requesting buffers for SortMergeResultPartitionReadScheduler
This closes #20459.
---
.../partition/SortMergeResultPartition.java | 3 +--
.../SortMergeResultPartitionReadScheduler.java | 22 ++++++++--------------
.../SortMergeResultPartitionReadSchedulerTest.java | 8 +++-----
3 files changed, 12 insertions(+), 21 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
index a8c678db053..aaa3d42bc19 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
@@ -168,8 +168,7 @@ public class SortMergeResultPartition extends ResultPartition {
// input balance of the downstream tasks
this.subpartitionOrder = getRandomSubpartitionOrder(numSubpartitions);
this.readScheduler =
- new SortMergeResultPartitionReadScheduler(
- numSubpartitions, readBufferPool, readIOExecutor, lock);
+ new SortMergeResultPartitionReadScheduler(readBufferPool, readIOExecutor, lock);
}
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
index 90f89262bbe..510ea614aa5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
@@ -85,9 +85,6 @@ class SortMergeResultPartitionReadScheduler implements Runnable, BufferRecycler
/** Executor to run the shuffle data reading task. */
private final Executor ioExecutor;
- /** Maximum number of buffers can be allocated by this partition reader. */
- private final int maxRequestedBuffers;
-
/**
* Maximum time to wait when requesting read buffers from the buffer pool before throwing an
* exception.
@@ -133,15 +130,11 @@ class SortMergeResultPartitionReadScheduler implements Runnable, BufferRecycler
private volatile boolean isReleased;
SortMergeResultPartitionReadScheduler(
- int numSubpartitions,
- BatchShuffleReadBufferPool bufferPool,
- Executor ioExecutor,
- Object lock) {
- this(numSubpartitions, bufferPool, ioExecutor, lock, DEFAULT_BUFFER_REQUEST_TIMEOUT);
+ BatchShuffleReadBufferPool bufferPool, Executor ioExecutor, Object lock) {
+ this(bufferPool, ioExecutor, lock, DEFAULT_BUFFER_REQUEST_TIMEOUT);
}
SortMergeResultPartitionReadScheduler(
- int numSubpartitions,
BatchShuffleReadBufferPool bufferPool,
Executor ioExecutor,
Object lock,
@@ -150,11 +143,6 @@ class SortMergeResultPartitionReadScheduler implements Runnable, BufferRecycler
this.lock = checkNotNull(lock);
this.bufferPool = checkNotNull(bufferPool);
this.ioExecutor = checkNotNull(ioExecutor);
- // one partition reader can consume at most Math.max(16M, numSubpartitions) (the expected
- // buffers per request is 4M) buffers for data read, which means larger parallelism, more
- // buffers. Currently, it is only an empirical strategy which can not be configured.
- this.maxRequestedBuffers =
- Math.max(4 * bufferPool.getNumBuffersPerRequest(), numSubpartitions);
this.bufferRequestTimeout = checkNotNull(bufferRequestTimeout);
}
@@ -406,6 +394,12 @@ class SortMergeResultPartitionReadScheduler implements Runnable, BufferRecycler
private void mayTriggerReading() {
assert Thread.holdsLock(lock);
+ // one partition reader can consume at most Math.max(16M, 2 * numReaders) (the expected
+ // buffers per request is 4M) buffers for data read, which means larger parallelism, more
+ // buffers. Currently, it is only an empirical strategy which can not be configured.
+ int maxRequestedBuffers =
+ Math.max(4 * bufferPool.getNumBuffersPerRequest(), 2 * allReaders.size());
+
if (!isRunning
&& !allReaders.isEmpty()
&& numRequestedBuffers + bufferPool.getNumBuffersPerRequest() <= maxRequestedBuffers
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
index ee442bcdc44..d65d408d1ad 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
@@ -105,9 +105,7 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
new PartitionedFileReader(partitionedFile, 0, dataFileChannel, indexFileChannel);
bufferPool = new BatchShuffleReadBufferPool(totalBytes, bufferSize);
executor = Executors.newFixedThreadPool(numThreads);
- readScheduler =
- new SortMergeResultPartitionReadScheduler(
- numSubpartitions, bufferPool, executor, this);
+ readScheduler = new SortMergeResultPartitionReadScheduler(bufferPool, executor, this);
}
@After
@@ -254,7 +252,7 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
List<MemorySegment> buffers = bufferPool.requestBuffers();
SortMergeResultPartitionReadScheduler readScheduler =
new SortMergeResultPartitionReadScheduler(
- numSubpartitions, bufferPool, executor, this, bufferRequestTimeout);
+ bufferPool, executor, this, bufferRequestTimeout);
long startTimestamp = System.nanoTime();
Assertions.assertThatThrownBy(readScheduler::allocateBuffers)
@@ -273,7 +271,7 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
new FakeBatchShuffleReadBufferPool(bufferSize * 3, bufferSize);
SortMergeResultPartitionReadScheduler readScheduler =
new SortMergeResultPartitionReadScheduler(
- numSubpartitions, bufferPool, executor, this, bufferRequestTimeout);
+ bufferPool, executor, this, bufferRequestTimeout);
SortMergeSubpartitionReader subpartitionReader =
new SortMergeSubpartitionReader(new NoOpBufferAvailablityListener(), fileReader);