You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by yi...@apache.org on 2022/02/04 03:52:26 UTC
[flink] branch master updated: [FLINK-25860][network] Move read buffer allocation and output file creation to setup method for sort-shuffle result partition to avoid blocking the main thread
This is an automated email from the ASF dual-hosted git repository.
yingjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d54fe51 [FLINK-25860][network] Move read buffer allocation and output file creation to setup method for sort-shuffle result partition to avoid blocking the main thread
d54fe51 is described below
commit d54fe516aaf8c1cddf216c341ac8729f46de899b
Author: Yuxin Tan <ta...@gmail.com>
AuthorDate: Fri Jan 28 18:14:01 2022 +0800
[FLINK-25860][network] Move read buffer allocation and output file creation to setup method for sort-shuffle result partition to avoid blocking the main thread
The read buffer allocation and output file creation of sort-shuffle is performed by the main thread. These operations are a little heavy and can block the main thread for a while which may influence other RPC calls including the follow-up task deployment. This change aims to solve the issue by moving read buffer allocation and output file creation to setup method.
This closes #18554.
---
.../partition/SortMergeResultPartition.java | 46 +++++++++++++++-------
.../SortMergeResultPartitionReadScheduler.java | 3 --
.../SortMergeResultPartitionReadSchedulerTest.java | 1 +
3 files changed, 33 insertions(+), 17 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
index 695800e..eabf8db 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
@@ -31,7 +31,6 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
-import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.function.SupplierWithException;
@@ -85,12 +84,24 @@ public class SortMergeResultPartition extends ResultPartition {
private final int networkBufferSize;
/** File writer for this result partition. */
- private final PartitionedFileWriter fileWriter;
+ @GuardedBy("lock")
+ private PartitionedFileWriter fileWriter;
+
+ /**
+ * Selected storage path to be used by this result partition to store shuffle data file and
+ * index file.
+ */
+ private final String resultFileBasePath;
/** Subpartition orders of coping data from {@link SortBuffer} and writing to file. */
private final int[] subpartitionOrder;
/**
+ * A shared buffer pool to allocate buffers from when reading data from this result partition.
+ */
+ private final BatchShuffleReadBufferPool readBufferPool;
+
+ /**
* Data read scheduler for this result partition which schedules data read of all subpartitions.
*/
private final SortMergeResultPartitionReadScheduler readScheduler;
@@ -132,6 +143,8 @@ public class SortMergeResultPartition extends ResultPartition {
bufferCompressor,
bufferPoolFactory);
+ this.resultFileBasePath = checkNotNull(resultFileBasePath);
+ this.readBufferPool = checkNotNull(readBufferPool);
this.networkBufferSize = readBufferPool.getBufferSize();
// because IO scheduling will always try to read data in file offset order for better IO
// performance, when writing data to file, we use a random subpartition order to avoid
@@ -141,19 +154,25 @@ public class SortMergeResultPartition extends ResultPartition {
this.readScheduler =
new SortMergeResultPartitionReadScheduler(
numSubpartitions, readBufferPool, readIOExecutor, lock);
-
- PartitionedFileWriter fileWriter = null;
- try {
- // allocate at most 4M heap memory for caching of index entries
- fileWriter = new PartitionedFileWriter(numSubpartitions, 4194304, resultFileBasePath);
- } catch (Throwable throwable) {
- ExceptionUtils.rethrow(throwable);
- }
- this.fileWriter = fileWriter;
}
@Override
public void setup() throws IOException {
+ synchronized (lock) {
+ if (isReleased()) {
+ throw new IOException("Result partition has been released.");
+ }
+ try {
+ // allocate at most 4M heap memory for caching of index entries
+ fileWriter =
+ new PartitionedFileWriter(numSubpartitions, 4194304, resultFileBasePath);
+ } catch (Throwable throwable) {
+ throw new IOException("Failed to create file writer.", throwable);
+ }
+ }
+
+ // initialize the buffer pool eagerly to avoid reporting errors such as OOM too late
+ readBufferPool.initialize();
super.setup();
int expectedWriteBuffers = NUM_WRITE_BUFFER_BYTES / networkBufferSize;
@@ -191,7 +210,7 @@ public class SortMergeResultPartition extends ResultPartition {
@Override
protected void releaseInternal() {
synchronized (lock) {
- if (resultFile == null) {
+ if (resultFile == null && fileWriter != null) {
fileWriter.releaseQuietly();
}
@@ -409,10 +428,9 @@ public class SortMergeResultPartition extends ResultPartition {
checkState(!isReleased(), "Result partition is already released.");
resultFile = fileWriter.finish();
+ super.finish();
LOG.info("New partitioned file produced: {}.", resultFile);
}
-
- super.finish();
}
private void releaseWriteBuffers() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
index 8dd33af..7b287c7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
@@ -148,9 +148,6 @@ class SortMergeResultPartitionReadScheduler implements Runnable, BufferRecycler
this.maxRequestedBuffers =
Math.max(2 * bufferPool.getNumBuffersPerRequest(), numSubpartitions);
this.bufferRequestTimeout = checkNotNull(bufferRequestTimeout);
-
- // initialize the buffer pool eagerly to avoid reporting errors like OOM too late
- bufferPool.initialize();
}
@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
index 090a432..3d77e97 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
@@ -158,6 +158,7 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
@Test(expected = IllegalStateException.class)
public void testCreateSubpartitionReaderAfterReleased() throws Exception {
+ bufferPool.initialize();
readScheduler.release();
try {
readScheduler.createSubpartitionReader(