You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/08/02 01:57:21 UTC
[flink] 06/13: [FLINK-27908] ResultPartition's subclass using setupInternal instead of setup to do initialization work.
This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit e930077b9057aae6f634e8638f38949299881887
Author: Weijie Guo <re...@163.com>
AuthorDate: Thu Jul 28 16:54:04 2022 +0800
[FLINK-27908] ResultPartition's subclass using setupInternal instead of setup to do initialization work.
---
.../runtime/io/network/partition/BufferWritingResultPartition.java | 4 +---
.../apache/flink/runtime/io/network/partition/ResultPartition.java | 4 ++++
.../flink/runtime/io/network/partition/SortMergeResultPartition.java | 4 +---
3 files changed, 6 insertions(+), 6 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
index 2422f3ad808..8b36a7587d1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
@@ -94,9 +94,7 @@ public abstract class BufferWritingResultPartition extends ResultPartition {
}
@Override
- public void setup() throws IOException {
- super.setup();
-
+ protected void setupInternal() throws IOException {
checkState(
bufferPool.getNumberOfRequiredMemorySegments() >= getNumberOfSubpartitions(),
"Bug in result partition setup logic: Buffer pool has not enough guaranteed buffers for"
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index ccdf8f4a138..e3b2e2ff74e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -158,9 +158,13 @@ public abstract class ResultPartition implements ResultPartitionWriter {
"Bug in result partition setup logic: Already registered buffer pool.");
this.bufferPool = checkNotNull(bufferPoolFactory.get());
+ setupInternal();
partitionManager.registerResultPartition(this);
}
+ /** Do the subclass's own setup operation. */
+ protected abstract void setupInternal() throws IOException;
+
public String getOwningTaskName() {
return owningTaskName;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
index 1620c36beea..a8c678db053 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
@@ -173,7 +173,7 @@ public class SortMergeResultPartition extends ResultPartition {
}
@Override
- public void setup() throws IOException {
+ protected void setupInternal() throws IOException {
synchronized (lock) {
if (isReleased()) {
throw new IOException("Result partition has been released.");
@@ -189,8 +189,6 @@ public class SortMergeResultPartition extends ResultPartition {
// initialize the buffer pool eagerly to avoid reporting errors such as OOM too late
readBufferPool.initialize();
- super.setup();
-
LOG.info("Sort-merge partition {} initialized.", getPartitionId());
}