You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/08/02 01:57:21 UTC

[flink] 06/13: [FLINK-27908] ResultPartition's subclass using setupInternal instead of setup to do initialization work.

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e930077b9057aae6f634e8638f38949299881887
Author: Weijie Guo <re...@163.com>
AuthorDate: Thu Jul 28 16:54:04 2022 +0800

    [FLINK-27908] ResultPartition's subclass using setupInternal instead of setup to do initialization work.
---
 .../runtime/io/network/partition/BufferWritingResultPartition.java    | 4 +---
 .../apache/flink/runtime/io/network/partition/ResultPartition.java    | 4 ++++
 .../flink/runtime/io/network/partition/SortMergeResultPartition.java  | 4 +---
 3 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
index 2422f3ad808..8b36a7587d1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
@@ -94,9 +94,7 @@ public abstract class BufferWritingResultPartition extends ResultPartition {
     }
 
     @Override
-    public void setup() throws IOException {
-        super.setup();
-
+    protected void setupInternal() throws IOException {
         checkState(
                 bufferPool.getNumberOfRequiredMemorySegments() >= getNumberOfSubpartitions(),
                 "Bug in result partition setup logic: Buffer pool has not enough guaranteed buffers for"
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index ccdf8f4a138..e3b2e2ff74e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -158,9 +158,13 @@ public abstract class ResultPartition implements ResultPartitionWriter {
                 "Bug in result partition setup logic: Already registered buffer pool.");
 
         this.bufferPool = checkNotNull(bufferPoolFactory.get());
+        setupInternal();
         partitionManager.registerResultPartition(this);
     }
 
+    /** Do the subclass's own setup operation. */
+    protected abstract void setupInternal() throws IOException;
+
     public String getOwningTaskName() {
         return owningTaskName;
     }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
index 1620c36beea..a8c678db053 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
@@ -173,7 +173,7 @@ public class SortMergeResultPartition extends ResultPartition {
     }
 
     @Override
-    public void setup() throws IOException {
+    protected void setupInternal() throws IOException {
         synchronized (lock) {
             if (isReleased()) {
                 throw new IOException("Result partition has been released.");
@@ -189,8 +189,6 @@ public class SortMergeResultPartition extends ResultPartition {
 
         // initialize the buffer pool eagerly to avoid reporting errors such as OOM too late
         readBufferPool.initialize();
-        super.setup();
-
         LOG.info("Sort-merge partition {} initialized.", getPartitionId());
     }