You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by yi...@apache.org on 2022/08/08 07:24:56 UTC

[flink] branch master updated: [FLINK-28823][network] Consider the actual number of subpartition readers when requesting buffers for SortMergeResultPartitionReadScheduler

This is an automated email from the ASF dual-hosted git repository.

yingjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ca45a28205b [FLINK-28823][network] Consider the actual number of subpartition readers when requesting buffers for SortMergeResultPartitionReadScheduler
ca45a28205b is described below

commit ca45a28205b424c2c77a21366fb29a67457672ff
Author: Yuxin Tan <ta...@gmail.com>
AuthorDate: Mon Aug 8 15:24:48 2022 +0800

    [FLINK-28823][network] Consider the actual number of subpartition readers when requesting buffers for SortMergeResultPartitionReadScheduler
    
    This closes #20459.
---
 .../partition/SortMergeResultPartition.java        |  3 +--
 .../SortMergeResultPartitionReadScheduler.java     | 22 ++++++++--------------
 .../SortMergeResultPartitionReadSchedulerTest.java |  8 +++-----
 3 files changed, 12 insertions(+), 21 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
index a8c678db053..aaa3d42bc19 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
@@ -168,8 +168,7 @@ public class SortMergeResultPartition extends ResultPartition {
         // input balance of the downstream tasks
         this.subpartitionOrder = getRandomSubpartitionOrder(numSubpartitions);
         this.readScheduler =
-                new SortMergeResultPartitionReadScheduler(
-                        numSubpartitions, readBufferPool, readIOExecutor, lock);
+                new SortMergeResultPartitionReadScheduler(readBufferPool, readIOExecutor, lock);
     }
 
     @Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
index 90f89262bbe..510ea614aa5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
@@ -85,9 +85,6 @@ class SortMergeResultPartitionReadScheduler implements Runnable, BufferRecycler
     /** Executor to run the shuffle data reading task. */
     private final Executor ioExecutor;
 
-    /** Maximum number of buffers can be allocated by this partition reader. */
-    private final int maxRequestedBuffers;
-
     /**
      * Maximum time to wait when requesting read buffers from the buffer pool before throwing an
      * exception.
@@ -133,15 +130,11 @@ class SortMergeResultPartitionReadScheduler implements Runnable, BufferRecycler
     private volatile boolean isReleased;
 
     SortMergeResultPartitionReadScheduler(
-            int numSubpartitions,
-            BatchShuffleReadBufferPool bufferPool,
-            Executor ioExecutor,
-            Object lock) {
-        this(numSubpartitions, bufferPool, ioExecutor, lock, DEFAULT_BUFFER_REQUEST_TIMEOUT);
+            BatchShuffleReadBufferPool bufferPool, Executor ioExecutor, Object lock) {
+        this(bufferPool, ioExecutor, lock, DEFAULT_BUFFER_REQUEST_TIMEOUT);
     }
 
     SortMergeResultPartitionReadScheduler(
-            int numSubpartitions,
             BatchShuffleReadBufferPool bufferPool,
             Executor ioExecutor,
             Object lock,
@@ -150,11 +143,6 @@ class SortMergeResultPartitionReadScheduler implements Runnable, BufferRecycler
         this.lock = checkNotNull(lock);
         this.bufferPool = checkNotNull(bufferPool);
         this.ioExecutor = checkNotNull(ioExecutor);
-        // one partition reader can consume at most Math.max(16M, numSubpartitions) (the expected
-        // buffers per request is 4M) buffers for data read, which means larger parallelism, more
-        // buffers. Currently, it is only an empirical strategy which can not be configured.
-        this.maxRequestedBuffers =
-                Math.max(4 * bufferPool.getNumBuffersPerRequest(), numSubpartitions);
         this.bufferRequestTimeout = checkNotNull(bufferRequestTimeout);
     }
 
@@ -406,6 +394,12 @@ class SortMergeResultPartitionReadScheduler implements Runnable, BufferRecycler
     private void mayTriggerReading() {
         assert Thread.holdsLock(lock);
 
+        // one partition reader can consume at most Math.max(16M, 2 * numReaders) (the expected
+        // buffers per request is 4M) buffers for data read, which means larger parallelism, more
+        // buffers. Currently, it is only an empirical strategy which can not be configured.
+        int maxRequestedBuffers =
+                Math.max(4 * bufferPool.getNumBuffersPerRequest(), 2 * allReaders.size());
+
         if (!isRunning
                 && !allReaders.isEmpty()
                 && numRequestedBuffers + bufferPool.getNumBuffersPerRequest() <= maxRequestedBuffers
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
index ee442bcdc44..d65d408d1ad 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
@@ -105,9 +105,7 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
                 new PartitionedFileReader(partitionedFile, 0, dataFileChannel, indexFileChannel);
         bufferPool = new BatchShuffleReadBufferPool(totalBytes, bufferSize);
         executor = Executors.newFixedThreadPool(numThreads);
-        readScheduler =
-                new SortMergeResultPartitionReadScheduler(
-                        numSubpartitions, bufferPool, executor, this);
+        readScheduler = new SortMergeResultPartitionReadScheduler(bufferPool, executor, this);
     }
 
     @After
@@ -254,7 +252,7 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
         List<MemorySegment> buffers = bufferPool.requestBuffers();
         SortMergeResultPartitionReadScheduler readScheduler =
                 new SortMergeResultPartitionReadScheduler(
-                        numSubpartitions, bufferPool, executor, this, bufferRequestTimeout);
+                        bufferPool, executor, this, bufferRequestTimeout);
 
         long startTimestamp = System.nanoTime();
         Assertions.assertThatThrownBy(readScheduler::allocateBuffers)
@@ -273,7 +271,7 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
                 new FakeBatchShuffleReadBufferPool(bufferSize * 3, bufferSize);
         SortMergeResultPartitionReadScheduler readScheduler =
                 new SortMergeResultPartitionReadScheduler(
-                        numSubpartitions, bufferPool, executor, this, bufferRequestTimeout);
+                        bufferPool, executor, this, bufferRequestTimeout);
         SortMergeSubpartitionReader subpartitionReader =
                 new SortMergeSubpartitionReader(new NoOpBufferAvailablityListener(), fileReader);