You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by yi...@apache.org on 2022/02/04 03:52:26 UTC

[flink] branch master updated: [FLINK-25860][network] Move read buffer allocation and output file creation to setup method for sort-shuffle result partition to avoid blocking the main thread

This is an automated email from the ASF dual-hosted git repository.

yingjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d54fe51  [FLINK-25860][network] Move read buffer allocation and output file creation to setup method for sort-shuffle result partition to avoid blocking the main thread
d54fe51 is described below

commit d54fe516aaf8c1cddf216c341ac8729f46de899b
Author: Yuxin Tan <ta...@gmail.com>
AuthorDate: Fri Jan 28 18:14:01 2022 +0800

    [FLINK-25860][network] Move read buffer allocation and output file creation to setup method for sort-shuffle result partition to avoid blocking the main thread
    
    The read buffer allocation and output file creation of sort-shuffle is performed by the main thread. These operations are a little heavy and can block the main thread for a while which may influence other RPC calls including the follow-up task deployment. This change aims to solve the issue by moving read buffer allocation and output file creation to setup method.
    
    This closes #18554.
---
 .../partition/SortMergeResultPartition.java        | 46 +++++++++++++++-------
 .../SortMergeResultPartitionReadScheduler.java     |  3 --
 .../SortMergeResultPartitionReadSchedulerTest.java |  1 +
 3 files changed, 33 insertions(+), 17 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
index 695800e..eabf8db 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
@@ -31,7 +31,6 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
-import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.function.SupplierWithException;
 
@@ -85,12 +84,24 @@ public class SortMergeResultPartition extends ResultPartition {
     private final int networkBufferSize;
 
     /** File writer for this result partition. */
-    private final PartitionedFileWriter fileWriter;
+    @GuardedBy("lock")
+    private PartitionedFileWriter fileWriter;
+
+    /**
+     * Selected storage path to be used by this result partition to store shuffle data file and
+     * index file.
+     */
+    private final String resultFileBasePath;
 
     /** Subpartition orders of coping data from {@link SortBuffer} and writing to file. */
     private final int[] subpartitionOrder;
 
     /**
+     * A shared buffer pool to allocate buffers from when reading data from this result partition.
+     */
+    private final BatchShuffleReadBufferPool readBufferPool;
+
+    /**
      * Data read scheduler for this result partition which schedules data read of all subpartitions.
      */
     private final SortMergeResultPartitionReadScheduler readScheduler;
@@ -132,6 +143,8 @@ public class SortMergeResultPartition extends ResultPartition {
                 bufferCompressor,
                 bufferPoolFactory);
 
+        this.resultFileBasePath = checkNotNull(resultFileBasePath);
+        this.readBufferPool = checkNotNull(readBufferPool);
         this.networkBufferSize = readBufferPool.getBufferSize();
         // because IO scheduling will always try to read data in file offset order for better IO
         // performance, when writing data to file, we use a random subpartition order to avoid
@@ -141,19 +154,25 @@ public class SortMergeResultPartition extends ResultPartition {
         this.readScheduler =
                 new SortMergeResultPartitionReadScheduler(
                         numSubpartitions, readBufferPool, readIOExecutor, lock);
-
-        PartitionedFileWriter fileWriter = null;
-        try {
-            // allocate at most 4M heap memory for caching of index entries
-            fileWriter = new PartitionedFileWriter(numSubpartitions, 4194304, resultFileBasePath);
-        } catch (Throwable throwable) {
-            ExceptionUtils.rethrow(throwable);
-        }
-        this.fileWriter = fileWriter;
     }
 
     @Override
     public void setup() throws IOException {
+        synchronized (lock) {
+            if (isReleased()) {
+                throw new IOException("Result partition has been released.");
+            }
+            try {
+                // allocate at most 4M heap memory for caching of index entries
+                fileWriter =
+                        new PartitionedFileWriter(numSubpartitions, 4194304, resultFileBasePath);
+            } catch (Throwable throwable) {
+                throw new IOException("Failed to create file writer.", throwable);
+            }
+        }
+
+        // initialize the buffer pool eagerly to avoid reporting errors such as OOM too late
+        readBufferPool.initialize();
         super.setup();
 
         int expectedWriteBuffers = NUM_WRITE_BUFFER_BYTES / networkBufferSize;
@@ -191,7 +210,7 @@ public class SortMergeResultPartition extends ResultPartition {
     @Override
     protected void releaseInternal() {
         synchronized (lock) {
-            if (resultFile == null) {
+            if (resultFile == null && fileWriter != null) {
                 fileWriter.releaseQuietly();
             }
 
@@ -409,10 +428,9 @@ public class SortMergeResultPartition extends ResultPartition {
             checkState(!isReleased(), "Result partition is already released.");
 
             resultFile = fileWriter.finish();
+            super.finish();
             LOG.info("New partitioned file produced: {}.", resultFile);
         }
-
-        super.finish();
     }
 
     private void releaseWriteBuffers() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
index 8dd33af..7b287c7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
@@ -148,9 +148,6 @@ class SortMergeResultPartitionReadScheduler implements Runnable, BufferRecycler
         this.maxRequestedBuffers =
                 Math.max(2 * bufferPool.getNumBuffersPerRequest(), numSubpartitions);
         this.bufferRequestTimeout = checkNotNull(bufferRequestTimeout);
-
-        // initialize the buffer pool eagerly to avoid reporting errors like OOM too late
-        bufferPool.initialize();
     }
 
     @Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
index 090a432..3d77e97 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
@@ -158,6 +158,7 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
 
     @Test(expected = IllegalStateException.class)
     public void testCreateSubpartitionReaderAfterReleased() throws Exception {
+        bufferPool.initialize();
         readScheduler.release();
         try {
             readScheduler.createSubpartitionReader(