You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/08/02 01:57:15 UTC

[flink] branch master updated (c3e72be836e -> 4a2f3a15903)

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from c3e72be836e [FLINK-28495][docs][docs-zh] Fix typos or mistakes of Flink CEP Document. This closes #20272
     new fc619df9b52 [hotfix] Make HsMemoryDataManager runWithLockMethod private.
     new 42f05f0cddd [hotfix] HsMemoryDataManager spillAsync's callback should assertNoException.
     new 9cde5322969 [hotfix] Rename HsResultPartitionReadScheduler to HsFileDataManager  Rename HsResultPartitionReadScheduler to HsFileDataManager as it plays the same role of FileDataManager mentioned in FLIP.
     new f3bbb6b6a9b [hotfix] Simplify the logic related to release and fail reader in HsFileDataManager.
     new 9331e7502e9 [FLINK-27908] HsBufferContext ignore repeatedly startSpilling and release instead of checkState.
     new e930077b905 [FLINK-27908] ResultPartition's subclass using setupInternal instead of setup to do initialization work.
     new 828b3a58aca [FLINK-27908] Extends onResultPartitionClosed to HsSpillingStrategy.
     new 0d185b86bd4 [FLINK-27908] Add lifecycle method to HsFileDataManager, HsMemoryDataManager and HsMemoryDataSpiller.
     new e2d50c4777d [FLINK-27908] HybridShuffleConfiguration supports set spilling strategy type.
     new 347423f8464 [FLINK-27908] Let HsMemoryDataManager can register HsSubpartitionViewInternalOperations and supports notifyDataAvailable.
     new d0b13381483 [FLINK-27908] Introduce HsSubpartitionView based on HsDataView and it's implementations.
     new fed2962d3a2 [FLINK-27908] Introduce HsResultPartition
     new 4a2f3a15903 [FLINK-27908] ResultPartitionFactory also supports HYBRID type.

The 13 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../partition/BufferWritingResultPartition.java    |   4 +-
 .../io/network/partition/ResultPartition.java      |   4 +
 .../network/partition/ResultPartitionFactory.java  |  22 +
 .../io/network/partition/ResultPartitionType.java  |   2 +-
 .../partition/SortMergeResultPartition.java        |   4 +-
 .../network/partition/hybrid/HsBufferContext.java  |  22 +-
 .../io/network/partition/hybrid/HsDataView.java    |  60 +++
 ...onReadScheduler.java => HsFileDataManager.java} |  81 ++--
 .../partition/hybrid/HsFullSpillingStrategy.java   |  20 +
 .../partition/hybrid/HsMemoryDataManager.java      | 104 +++--
 .../hybrid/HsMemoryDataManagerOperation.java       |   7 +
 .../partition/hybrid/HsMemoryDataSpiller.java      |  37 +-
 .../partition/hybrid/HsResultPartition.java        | 266 ++++++++++++
 .../hybrid/HsSelectiveSpillingStrategy.java        |  20 +
 .../partition/hybrid/HsSpillingStrategy.java       |  15 +
 .../partition/hybrid/HsSubpartitionFileReader.java |   6 +-
 .../hybrid/HsSubpartitionFileReaderImpl.java       |  90 +++-
 .../hybrid/HsSubpartitionMemoryDataManager.java    |  67 ++-
 .../partition/hybrid/HsSubpartitionView.java       | 262 ++++++++++++
 .../HsSubpartitionViewInternalOperations.java      |   4 +-
 .../hybrid/HybridShuffleConfiguration.java         |  32 +-
 .../partition/ResultPartitionFactoryTest.java      |  16 +
 .../partition/hybrid/HsBufferContextTest.java      |  17 +-
 ...hedulerTest.java => HsFileDataManagerTest.java} |  87 ++--
 .../hybrid/HsFullSpillingStrategyTest.java         |  33 ++
 .../partition/hybrid/HsMemoryDataManagerTest.java  |  30 +-
 .../partition/hybrid/HsMemoryDataSpillerTest.java  |  42 +-
 .../partition/hybrid/HsResultPartitionTest.java    | 476 +++++++++++++++++++++
 .../hybrid/HsSelectiveSpillingStrategyTest.java    |  32 ++
 .../hybrid/HsSubpartitionFileReaderImplTest.java   |  88 +++-
 .../HsSubpartitionMemoryDataManagerTest.java       |  20 +-
 .../partition/hybrid/HsSubpartitionViewTest.java   | 313 ++++++++++++++
 .../partition/hybrid/TestingHsDataView.java        | 127 ++++++
 .../hybrid/TestingMemoryDataManagerOperation.java  |  21 +-
 .../partition/hybrid/TestingSpillingStrategy.java  |  23 +-
 .../TestingSubpartitionViewInternalOperation.java  |   2 +-
 36 files changed, 2258 insertions(+), 198 deletions(-)
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsDataView.java
 rename flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/{HsResultPartitionReadScheduler.java => HsFileDataManager.java} (89%)
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java
 rename flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/{HsResultPartitionReadSchedulerTest.java => HsFileDataManagerTest.java} (84%)
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingHsDataView.java


[flink] 04/13: [hotfix] Simplify the logic related to release and fail reader in HsFileDataManager.

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f3bbb6b6a9b26a135e3e257e51be942d6d8bedf0
Author: Weijie Guo <re...@163.com>
AuthorDate: Fri Jul 29 15:50:45 2022 +0800

    [hotfix] Simplify the logic related to release and fail reader in HsFileDataManager.
---
 .../partition/hybrid/HsFileDataManager.java        | 32 +++++++---------------
 1 file changed, 10 insertions(+), 22 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
index dab671aa723..dbcd204300b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
@@ -93,10 +93,6 @@ public class HsFileDataManager implements Runnable, BufferRecycler {
 
     private final HybridShuffleConfiguration hybridShuffleConfiguration;
 
-    /** All failed subpartition readers to be released. */
-    @GuardedBy("lock")
-    private final Set<HsSubpartitionFileReader> failedReaders = new HashSet<>();
-
     /** All readers waiting to read data of different subpartitions. */
     @GuardedBy("lock")
     private final Set<HsSubpartitionFileReader> allReaders = new HashSet<>();
@@ -178,7 +174,6 @@ public class HsFileDataManager implements Runnable, BufferRecycler {
             }
             isReleased = true;
 
-            failedReaders.addAll(allReaders);
             List<HsSubpartitionFileReader> pendingReaders = new ArrayList<>(allReaders);
             mayNotifyReleased();
             failSubpartitionReaders(
@@ -324,7 +319,7 @@ public class HsFileDataManager implements Runnable, BufferRecycler {
     private void failSubpartitionReaders(
             Collection<HsSubpartitionFileReader> readers, Throwable failureCause) {
         synchronized (lock) {
-            failedReaders.addAll(readers);
+            removeSubpartitionReaders(readers);
         }
 
         for (HsSubpartitionFileReader reader : readers) {
@@ -332,9 +327,17 @@ public class HsFileDataManager implements Runnable, BufferRecycler {
         }
     }
 
+    @GuardedBy("lock")
+    private void removeSubpartitionReaders(Collection<HsSubpartitionFileReader> readers) {
+        allReaders.removeAll(readers);
+        if (allReaders.isEmpty()) {
+            bufferPool.unregisterRequester(this);
+            closeFileChannel();
+        }
+    }
+
     private void endCurrentRoundOfReading(int numBuffersRead) {
         synchronized (lock) {
-            removeFailedReaders();
             numRequestedBuffers += numBuffersRead;
             isRunning = false;
             mayTriggerReading();
@@ -342,21 +345,6 @@ public class HsFileDataManager implements Runnable, BufferRecycler {
         }
     }
 
-    @GuardedBy("lock")
-    private void removeFailedReaders() {
-        assert Thread.holdsLock(lock);
-
-        for (HsSubpartitionFileReader reader : failedReaders) {
-            allReaders.remove(reader);
-        }
-        failedReaders.clear();
-
-        if (allReaders.isEmpty()) {
-            bufferPool.unregisterRequester(this);
-            closeFileChannel();
-        }
-    }
-
     @GuardedBy("lock")
     private void lazyInitialize() throws IOException {
         assert Thread.holdsLock(lock);


[flink] 01/13: [hotfix] Make HsMemoryDataManager runWithLockMethod private.

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fc619df9b520e33c332d39a34ea05bdc3ec3f4df
Author: Weijie Guo <re...@163.com>
AuthorDate: Tue Jul 26 20:34:58 2022 +0800

    [hotfix] Make HsMemoryDataManager runWithLockMethod private.
---
 .../flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
index 0a7c60cf9b2..b58338b6912 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
@@ -255,7 +255,7 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData
         bufferPool.recycle(buffer);
     }
 
-    public <T, R extends Exception> T callWithLock(SupplierWithException<T, R> callable) throws R {
+    private <T, R extends Exception> T callWithLock(SupplierWithException<T, R> callable) throws R {
         try {
             lock.lock();
             return callable.get();


[flink] 11/13: [FLINK-27908] Introduce HsSubpartitionView based on HsDataView and it's implementations.

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d0b133814838e3614a41e70b5e6a27b80afa0967
Author: Weijie Guo <re...@163.com>
AuthorDate: Thu Jul 28 20:14:52 2022 +0800

    [FLINK-27908] Introduce HsSubpartitionView based on HsDataView and it's implementations.
---
 .../io/network/partition/hybrid/HsDataView.java    |  60 ++++
 .../partition/hybrid/HsFileDataManager.java        |  16 +-
 .../partition/hybrid/HsMemoryDataManager.java      |  23 +-
 .../partition/hybrid/HsSubpartitionFileReader.java |   6 +-
 .../hybrid/HsSubpartitionFileReaderImpl.java       |  86 +++++-
 .../hybrid/HsSubpartitionMemoryDataManager.java    |  29 +-
 .../partition/hybrid/HsSubpartitionView.java       | 262 +++++++++++++++++
 .../partition/hybrid/HsFileDataManagerTest.java    |  27 +-
 .../hybrid/HsSubpartitionFileReaderImplTest.java   |  88 +++++-
 .../HsSubpartitionMemoryDataManagerTest.java       |  20 +-
 .../partition/hybrid/HsSubpartitionViewTest.java   | 313 +++++++++++++++++++++
 .../partition/hybrid/TestingHsDataView.java        | 127 +++++++++
 12 files changed, 1011 insertions(+), 46 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsDataView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsDataView.java
new file mode 100644
index 00000000000..c3d4cb9ed83
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsDataView.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer.DataType;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
+
+import java.util.Optional;
+
+/**
+ * A view for {@link HsSubpartitionView} to find out what data exists in memory or disk and polling
+ * the data.
+ */
+public interface HsDataView {
+
+    /**
+     * Try to consume next buffer.
+     *
+     * <p>Only invoked by consumer thread.
+     *
+     * @param nextBufferToConsume next buffer index to consume.
+     * @return If the target buffer does exist, return buffer and next buffer's backlog, otherwise
+     *     return {@link Optional#empty()}.
+     */
+    Optional<BufferAndBacklog> consumeBuffer(int nextBufferToConsume) throws Throwable;
+
+    /**
+     * Get dataType of next buffer to consume.
+     *
+     * @param nextBufferToConsume next buffer index to consume
+     * @return next buffer's dataType. If not found in memory, return {@link DataType#NONE}.
+     */
+    DataType peekNextToConsumeDataType(int nextBufferToConsume);
+
+    /**
+     * Get the number of buffers backlog.
+     *
+     * @return backlog of this view's corresponding subpartition.
+     */
+    int getBacklog();
+
+    /** Release this {@link HsDataView} when related subpartition view is releasing. */
+    void releaseDataView();
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
index f37a1ff5164..6070a838110 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
@@ -147,7 +147,7 @@ public class HsFileDataManager implements Runnable, BufferRecycler {
     }
 
     /** This method only called by result partition to create subpartitionFileReader. */
-    public HsSubpartitionFileReader registerNewSubpartition(
+    public HsDataView registerNewSubpartition(
             int subpartitionId, HsSubpartitionViewInternalOperations operation) throws IOException {
         synchronized (lock) {
             checkState(!isReleased, "HsFileDataManager is already released.");
@@ -159,7 +159,8 @@ public class HsFileDataManager implements Runnable, BufferRecycler {
                             dataFileChannel,
                             operation,
                             dataIndex,
-                            hybridShuffleConfiguration.getMaxBuffersReadAhead());
+                            hybridShuffleConfiguration.getMaxBuffersReadAhead(),
+                            this::releaseSubpartitionReader);
 
             allReaders.add(subpartitionReader);
 
@@ -172,6 +173,17 @@ public class HsFileDataManager implements Runnable, BufferRecycler {
         IOUtils.deleteFileQuietly(dataFilePath);
     }
 
+    /**
+     * Release specific {@link HsSubpartitionFileReader} from {@link HsFileDataManager}.
+     *
+     * @param subpartitionFileReader to release.
+     */
+    public void releaseSubpartitionReader(HsSubpartitionFileReader subpartitionFileReader) {
+        synchronized (lock) {
+            removeSubpartitionReaders(Collections.singleton(subpartitionFileReader));
+        }
+    }
+
     /** Releases this file data manager and delete shuffle data after all readers is removed. */
     public void release() {
         synchronized (lock) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
index 9bf57b51c4b..11228fe7206 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
@@ -121,7 +121,7 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData
      * #subpartitionViewOperationsMap}. It is used to obtain the consumption progress of the
      * subpartition.
      */
-    public void registerSubpartitionView(
+    public HsDataView registerSubpartitionView(
             int subpartitionId, HsSubpartitionViewInternalOperations viewOperations) {
         HsSubpartitionViewInternalOperations oldView =
                 subpartitionViewOperationsMap.put(subpartitionId, viewOperations);
@@ -130,6 +130,7 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData
                     "subpartition : {} register subpartition view will replace old view. ",
                     subpartitionId);
         }
+        return getSubpartitionMemoryDataManager(subpartitionId);
     }
 
     /** Close this {@link HsMemoryDataManager}, it means no data can append to memory. */
@@ -318,24 +319,4 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData
             lock.unlock();
         }
     }
-
-    /** Integrate the buffer and dataType of next buffer. */
-    public static class BufferAndNextDataType {
-        private final Buffer buffer;
-
-        private final Buffer.DataType nextDataType;
-
-        public BufferAndNextDataType(Buffer buffer, Buffer.DataType nextDataType) {
-            this.buffer = buffer;
-            this.nextDataType = nextDataType;
-        }
-
-        public Buffer getBuffer() {
-            return buffer;
-        }
-
-        public Buffer.DataType getNextDataType() {
-            return nextDataType;
-        }
-    }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReader.java
index e582be2a620..fbc044d6da0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReader.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import java.io.IOException;
 import java.nio.channels.FileChannel;
 import java.util.Queue;
+import java.util.function.Consumer;
 
 /**
  * This component is responsible for reading data from disk for a specific subpartition.
@@ -31,7 +32,7 @@ import java.util.Queue;
  * <p>In order to access the disk as sequentially as possible {@link HsSubpartitionFileReader} need
  * to be able to compare priorities.
  */
-public interface HsSubpartitionFileReader extends Comparable<HsSubpartitionFileReader> {
+public interface HsSubpartitionFileReader extends Comparable<HsSubpartitionFileReader>, HsDataView {
     /** Do prep work before this {@link HsSubpartitionFileReader} is scheduled to read data. */
     void prepareForScheduling();
 
@@ -58,6 +59,7 @@ public interface HsSubpartitionFileReader extends Comparable<HsSubpartitionFileR
                 FileChannel dataFileChannel,
                 HsSubpartitionViewInternalOperations operation,
                 HsFileDataIndex dataIndex,
-                int maxBuffersReadAhead);
+                int maxBuffersReadAhead,
+                Consumer<HsSubpartitionFileReader> fileReaderReleaser);
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
index ecf02260dd1..a355765ac35 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
@@ -23,6 +23,8 @@ import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
+import org.apache.flink.util.ExceptionUtils;
 
 import javax.annotation.Nullable;
 
@@ -34,6 +36,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Queue;
 import java.util.concurrent.LinkedBlockingDeque;
+import java.util.function.Consumer;
 
 import static org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil.positionToNextBuffer;
 import static org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil.readFromByteChannel;
@@ -62,6 +65,8 @@ public class HsSubpartitionFileReaderImpl implements HsSubpartitionFileReader {
 
     private final Deque<BufferIndexOrError> loadedBuffers = new LinkedBlockingDeque<>();
 
+    private final Consumer<HsSubpartitionFileReader> fileReaderReleaser;
+
     private volatile boolean isFailed;
 
     public HsSubpartitionFileReaderImpl(
@@ -69,12 +74,14 @@ public class HsSubpartitionFileReaderImpl implements HsSubpartitionFileReader {
             FileChannel dataFileChannel,
             HsSubpartitionViewInternalOperations operations,
             HsFileDataIndex dataIndex,
-            int maxBufferReadAhead) {
+            int maxBufferReadAhead,
+            Consumer<HsSubpartitionFileReader> fileReaderReleaser) {
         this.subpartitionId = subpartitionId;
         this.dataFileChannel = dataFileChannel;
         this.operations = operations;
         this.bufferIndexManager = new BufferIndexManager(maxBufferReadAhead);
         this.cachedRegionManager = new CachedRegionManager(subpartitionId, dataIndex);
+        this.fileReaderReleaser = fileReaderReleaser;
     }
 
     @Override
@@ -192,10 +199,77 @@ public class HsSubpartitionFileReaderImpl implements HsSubpartitionFileReader {
         return loadedBuffers;
     }
 
+    @Override
+    public Optional<ResultSubpartition.BufferAndBacklog> consumeBuffer(int nextBufferToConsume)
+            throws Throwable {
+        if (!checkAndGetFirstBufferIndexOrError(nextBufferToConsume).isPresent()) {
+            return Optional.empty();
+        }
+
+        // already ensure that peek element is not null and not throwable.
+        BufferIndexOrError current = checkNotNull(loadedBuffers.poll());
+
+        BufferIndexOrError next = loadedBuffers.peek();
+
+        Buffer.DataType nextDataType = next == null ? Buffer.DataType.NONE : next.getDataType();
+        int backlog = loadedBuffers.size();
+        int bufferIndex = current.getIndex();
+        Buffer buffer =
+                current.getBuffer()
+                        .orElseThrow(
+                                () ->
+                                        new NullPointerException(
+                                                "Get a non-throwable and non-buffer bufferIndexOrError, which is not allowed"));
+        return Optional.of(
+                ResultSubpartition.BufferAndBacklog.fromBufferAndLookahead(
+                        buffer, nextDataType, backlog, bufferIndex));
+    }
+
+    @Override
+    public Buffer.DataType peekNextToConsumeDataType(int nextBufferToConsume) {
+        Buffer.DataType dataType = Buffer.DataType.NONE;
+        try {
+            dataType =
+                    checkAndGetFirstBufferIndexOrError(nextBufferToConsume)
+                            .map(BufferIndexOrError::getDataType)
+                            .orElse(Buffer.DataType.NONE);
+        } catch (Throwable throwable) {
+            ExceptionUtils.rethrow(throwable);
+        }
+        return dataType;
+    }
+
+    @Override
+    public void releaseDataView() {
+        fileReaderReleaser.accept(this);
+    }
+
+    @Override
+    public int getBacklog() {
+        return loadedBuffers.size();
+    }
+
     // ------------------------------------------------------------------------
     //  Internal Methods
     // ------------------------------------------------------------------------
 
+    private Optional<BufferIndexOrError> checkAndGetFirstBufferIndexOrError(int expectedBufferIndex)
+            throws Throwable {
+        if (loadedBuffers.isEmpty()) {
+            return Optional.empty();
+        }
+
+        BufferIndexOrError peek = loadedBuffers.peek();
+
+        if (peek.getThrowable().isPresent()) {
+            throw peek.getThrowable().get();
+        } else if (peek.getIndex() != expectedBufferIndex) {
+            return Optional.empty();
+        }
+
+        return Optional.of(peek);
+    }
+
     private void moveFileOffsetToBuffer(int bufferIndex) throws IOException {
         Tuple2<Integer, Long> indexAndOffset =
                 cachedRegionManager.getNumSkipAndFileOffset(bufferIndex);
@@ -403,9 +477,15 @@ public class HsSubpartitionFileReaderImpl implements HsSubpartitionFileReader {
                 FileChannel dataFileChannel,
                 HsSubpartitionViewInternalOperations operation,
                 HsFileDataIndex dataIndex,
-                int maxBuffersReadAhead) {
+                int maxBuffersReadAhead,
+                Consumer<HsSubpartitionFileReader> fileReaderReleaser) {
             return new HsSubpartitionFileReaderImpl(
-                    subpartitionId, dataFileChannel, operation, dataIndex, maxBuffersReadAhead);
+                    subpartitionId,
+                    dataFileChannel,
+                    operation,
+                    dataIndex,
+                    maxBuffersReadAhead,
+                    fileReaderReleaser);
         }
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java
index b5079e7df21..17b7e833e29 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
 import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider.ConsumeStatus;
 import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider.SpillStatus;
 import org.apache.flink.util.function.SupplierWithException;
@@ -54,7 +55,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * This class is responsible for managing the data in a single subpartition. One {@link
  * HsMemoryDataManager} will hold multiple {@link HsSubpartitionMemoryDataManager}.
  */
-public class HsSubpartitionMemoryDataManager {
+public class HsSubpartitionMemoryDataManager implements HsDataView {
     private final int targetChannel;
 
     private final int bufferSize;
@@ -108,22 +109,24 @@ public class HsSubpartitionMemoryDataManager {
     @SuppressWarnings("FieldAccessNotGuarded")
     // Note that: callWithLock ensure that code block guarded by resultPartitionReadLock and
     // subpartitionLock.
+    @Override
     public DataType peekNextToConsumeDataType(int nextToConsumeIndex) {
         return callWithLock(() -> peekNextToConsumeDataTypeInternal(nextToConsumeIndex));
     }
 
     /**
      * Check whether the head of {@link #unConsumedBuffers} is the buffer to be consumed. If so,
-     * return the buffer and next data type.
+     * return the buffer and backlog.
      *
      * @param toConsumeIndex index of buffer to be consumed.
      * @return If the head of {@link #unConsumedBuffers} is target, return optional of the buffer
-     *     and next data type. Otherwise, return {@link Optional#empty()}.
+     *     and backlog. Otherwise, return {@link Optional#empty()}.
      */
     @SuppressWarnings("FieldAccessNotGuarded")
     // Note that: callWithLock ensure that code block guarded by resultPartitionReadLock and
     // subpartitionLock.
-    public Optional<HsMemoryDataManager.BufferAndNextDataType> consumeBuffer(int toConsumeIndex) {
+    @Override
+    public Optional<BufferAndBacklog> consumeBuffer(int toConsumeIndex) {
         Optional<Tuple2<HsBufferContext, DataType>> bufferAndNextDataType =
                 callWithLock(
                         () -> {
@@ -145,8 +148,22 @@ public class HsSubpartitionMemoryDataManager {
                                 tuple.f0.getBufferIndexAndChannel()));
         return bufferAndNextDataType.map(
                 tuple ->
-                        new HsMemoryDataManager.BufferAndNextDataType(
-                                tuple.f0.getBuffer(), tuple.f1));
+                        new BufferAndBacklog(
+                                tuple.f0.getBuffer(), getBacklog(), tuple.f1, toConsumeIndex));
+    }
+
+    @SuppressWarnings("FieldAccessNotGuarded")
+    // Un-synchronized get unConsumedBuffers size to provide memory data backlog,this will make the
+    // result greater than or equal to the actual backlog, but obtaining an accurate backlog will
+    // bring too much extra overhead.
+    @Override
+    public int getBacklog() {
+        return unConsumedBuffers.size();
+    }
+
+    @Override
+    public void releaseDataView() {
+        // nothing to do for memory data.
     }
 
     // ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java
new file mode 100644
index 00000000000..5aa7e8bac0b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** The read view of HsResultPartition, data can be read from memory or disk. */
+public class HsSubpartitionView
+        implements ResultSubpartitionView, HsSubpartitionViewInternalOperations {
+    private final BufferAvailabilityListener availabilityListener;
+    private final Object lock = new Object();
+
+    /** Index of last consumed buffer. */
+    @GuardedBy("lock")
+    private int lastConsumedBufferIndex = -1;
+
+    @GuardedBy("lock")
+    private boolean needNotify = false;
+
+    @Nullable
+    @GuardedBy("lock")
+    private Buffer.DataType cachedNextDataType = null;
+
+    @Nullable
+    @GuardedBy("lock")
+    private Throwable failureCause = null;
+
+    @GuardedBy("lock")
+    private boolean isReleased = false;
+
+    @Nullable
+    @GuardedBy("lock")
+    // diskDataView can be null only before initialization.
+    private HsDataView diskDataView;
+
+    @Nullable
+    @GuardedBy("lock")
+    // memoryDataView can be null only before initialization.
+    private HsDataView memoryDataView;
+
+    public HsSubpartitionView(BufferAvailabilityListener availabilityListener) {
+        this.availabilityListener = availabilityListener;
+    }
+
+    @Nullable
+    @Override
+    public BufferAndBacklog getNextBuffer() {
+        synchronized (lock) {
+            try {
+                checkNotNull(diskDataView, "disk data view must be not null.");
+                checkNotNull(memoryDataView, "memory data view must be not null.");
+
+                Optional<BufferAndBacklog> bufferToConsume = tryReadFromDisk();
+                if (!bufferToConsume.isPresent()) {
+                    bufferToConsume = memoryDataView.consumeBuffer(lastConsumedBufferIndex + 1);
+                }
+                updateConsumingStatus(bufferToConsume);
+                return bufferToConsume.orElse(null);
+            } catch (Throwable cause) {
+                releaseInternal(cause);
+                return null;
+            }
+        }
+    }
+
+    @Override
+    public void notifyDataAvailable() {
+        boolean notifyDownStream = false;
+        synchronized (lock) {
+            if (isReleased) {
+                return;
+            }
+            if (needNotify) {
+                notifyDownStream = true;
+                needNotify = false;
+            }
+        }
+        // notify outside of lock to avoid deadlock
+        if (notifyDownStream) {
+            availabilityListener.notifyDataAvailable();
+        }
+    }
+
+    @Override
+    public AvailabilityWithBacklog getAvailabilityAndBacklog(int numCreditsAvailable) {
+        synchronized (lock) {
+            boolean availability = numCreditsAvailable > 0;
+            if (numCreditsAvailable <= 0
+                    && cachedNextDataType != null
+                    && cachedNextDataType == Buffer.DataType.EVENT_BUFFER) {
+                availability = true;
+            }
+            return new AvailabilityWithBacklog(availability, diskDataView.getBacklog());
+        }
+    }
+
+    @Override
+    public void releaseAllResources() throws IOException {
+        releaseInternal(null);
+    }
+
+    @Override
+    public boolean isReleased() {
+        synchronized (lock) {
+            return isReleased;
+        }
+    }
+
+    @Override
+    public int getConsumingOffset() {
+        synchronized (lock) {
+            return lastConsumedBufferIndex;
+        }
+    }
+
+    @Override
+    public Throwable getFailureCause() {
+        synchronized (lock) {
+            return failureCause;
+        }
+    }
+
+    /**
+     * Set {@link HsDataView} for this subpartition, this method only called when {@link
+     * HsSubpartitionFileReader} is creating.
+     */
+    void setDiskDataView(HsDataView diskDataView) {
+        synchronized (lock) {
+            checkState(this.diskDataView == null, "repeatedly set disk data view is not allowed.");
+            this.diskDataView = diskDataView;
+        }
+    }
+
+    /**
+     * Set {@link HsDataView} for this subpartition, this method only called when {@link
+     * HsSubpartitionFileReader} is creating.
+     */
+    void setMemoryDataView(HsDataView memoryDataView) {
+        synchronized (lock) {
+            checkState(
+                    this.memoryDataView == null, "repeatedly set memory data view is not allowed.");
+            this.memoryDataView = memoryDataView;
+        }
+    }
+
+    @Override
+    public void resumeConsumption() {
+        throw new UnsupportedOperationException("resumeConsumption should never be called.");
+    }
+
+    @Override
+    public void acknowledgeAllDataProcessed() {
+        // in case of bounded partitions there is no upstream to acknowledge, we simply ignore
+        // the ack, as there are no checkpoints
+    }
+
+    @SuppressWarnings("FieldAccessNotGuarded")
+    @Override
+    public int unsynchronizedGetNumberOfQueuedBuffers() {
+        return diskDataView.getBacklog();
+    }
+
+    @SuppressWarnings("FieldAccessNotGuarded")
+    @Override
+    public int getNumberOfQueuedBuffers() {
+        return diskDataView.getBacklog();
+    }
+
+    @Override
+    public void notifyNewBufferSize(int newBufferSize) {
+        throw new UnsupportedOperationException("Method should never be called.");
+    }
+
+    // -------------------------------
+    //       Internal Methods
+    // -------------------------------
+
+    @GuardedBy("lock")
+    private Optional<BufferAndBacklog> tryReadFromDisk() throws Throwable {
+        final int nextBufferIndexToConsume = lastConsumedBufferIndex + 1;
+        return checkNotNull(diskDataView)
+                .consumeBuffer(nextBufferIndexToConsume)
+                .map(
+                        bufferAndBacklog -> {
+                            if (bufferAndBacklog.getNextDataType() == Buffer.DataType.NONE) {
+                                return new BufferAndBacklog(
+                                        bufferAndBacklog.buffer(),
+                                        bufferAndBacklog.buffersInBacklog(),
+                                        checkNotNull(memoryDataView)
+                                                .peekNextToConsumeDataType(
+                                                        nextBufferIndexToConsume + 1),
+                                        bufferAndBacklog.getSequenceNumber());
+                            }
+                            return bufferAndBacklog;
+                        });
+    }
+
+    @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
+    @GuardedBy("lock")
+    private void updateConsumingStatus(Optional<BufferAndBacklog> bufferAndBacklog) {
+        assert Thread.holdsLock(lock);
+        // if consumed, update and check consume offset
+        if (bufferAndBacklog.isPresent()) {
+            ++lastConsumedBufferIndex;
+            checkState(bufferAndBacklog.get().getSequenceNumber() == lastConsumedBufferIndex);
+        }
+
+        // update need-notify
+        boolean dataAvailable =
+                bufferAndBacklog.map(BufferAndBacklog::isDataAvailable).orElse(false);
+        needNotify = !dataAvailable;
+        // update cached next data type
+        cachedNextDataType = bufferAndBacklog.map(BufferAndBacklog::getNextDataType).orElse(null);
+    }
+
+    private void releaseInternal(@Nullable Throwable throwable) {
+        boolean releaseSubpartitionReader = false;
+        synchronized (lock) {
+            if (isReleased) {
+                return;
+            }
+            isReleased = true;
+            failureCause = throwable;
+            if (diskDataView != null) {
+                releaseSubpartitionReader = true;
+            }
+        }
+        // release subpartition reader outside of lock to avoid deadlock.
+        if (releaseSubpartitionReader) {
+            //noinspection FieldAccessNotGuarded
+            diskDataView.releaseDataView();
+        }
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java
index 1a39e14ca02..1a2e28959a6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java
@@ -21,7 +21,9 @@ package org.apache.flink.runtime.io.network.partition.hybrid;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
 import org.apache.flink.util.TestLoggerExtension;
 import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
 import org.apache.flink.util.function.BiConsumerWithException;
@@ -40,6 +42,7 @@ import java.nio.file.Path;
 import java.nio.file.StandardOpenOption;
 import java.time.Duration;
 import java.util.ArrayDeque;
+import java.util.Optional;
 import java.util.Queue;
 import java.util.Random;
 import java.util.concurrent.CompletableFuture;
@@ -396,6 +399,27 @@ class HsFileDataManagerTest {
             this.failConsumer = failConsumer;
         }
 
+        @Override
+        public Optional<ResultSubpartition.BufferAndBacklog> consumeBuffer(
+                int nextBufferToConsume) {
+            return Optional.empty();
+        }
+
+        @Override
+        public Buffer.DataType peekNextToConsumeDataType(int nextBufferToConsume) {
+            return Buffer.DataType.NONE;
+        }
+
+        @Override
+        public int getBacklog() {
+            return 0;
+        }
+
+        @Override
+        public void releaseDataView() {
+            // do nothing.
+        }
+
         /** Factory for {@link TestingHsSubpartitionFileReader}. */
         private static class Factory implements HsSubpartitionFileReader.Factory {
             private final Queue<HsSubpartitionFileReader> allReaders = new ArrayDeque<>();
@@ -406,7 +430,8 @@ class HsFileDataManagerTest {
                     FileChannel dataFileChannel,
                     HsSubpartitionViewInternalOperations operation,
                     HsFileDataIndex dataIndex,
-                    int maxBuffersReadAhead) {
+                    int maxBuffersReadAhead,
+                    Consumer<HsSubpartitionFileReader> fileReaderReleaser) {
                 return checkNotNull(allReaders.poll());
             }
         }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java
index 36a0833163d..d25a5a34fb3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.Buffer.DataType;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
@@ -360,6 +361,86 @@ class HsSubpartitionFileReaderImplTest {
         assertThat(fileReader1).isGreaterThan(fileReader2);
     }
 
+    @Test
+    void testConsumeBuffer() throws Throwable {
+        TestingSubpartitionViewInternalOperation viewNotifier =
+                new TestingSubpartitionViewInternalOperation();
+        HsSubpartitionFileReaderImpl subpartitionFileReader =
+                createSubpartitionFileReader(0, viewNotifier);
+
+        // if no preload data in file reader, return Optional.empty.
+        assertThat(subpartitionFileReader.consumeBuffer(0)).isNotPresent();
+
+        // buffers in file: (0-0, 0-1)
+        writeDataToFile(0, 0, 0, 2);
+
+        Queue<MemorySegment> memorySegments = createsMemorySegments(2);
+        // trigger reading, add buffer to queue.
+        subpartitionFileReader.readBuffers(memorySegments, (ignore) -> {});
+
+        // if nextBufferToConsume is not equal to peek elements index, return Optional.empty.
+        assertThat(subpartitionFileReader.consumeBuffer(10)).isNotPresent();
+
+        assertThat(subpartitionFileReader.consumeBuffer(0))
+                .hasValueSatisfying(
+                        (bufferAndBacklog -> {
+                            assertThat(bufferAndBacklog.getNextDataType())
+                                    .isEqualTo(DataType.EVENT_BUFFER);
+                            assertThat(bufferAndBacklog.getSequenceNumber()).isEqualTo(0);
+                            // first buffer's data is 0.
+                            assertThat(
+                                            bufferAndBacklog
+                                                    .buffer()
+                                                    .getNioBufferReadable()
+                                                    .order(ByteOrder.nativeOrder())
+                                                    .getInt())
+                                    .isEqualTo(0);
+                        }));
+    }
+
+    @Test
+    void testPeekNextToConsumeDataTypeOrConsumeBufferThrowException() {
+        TestingSubpartitionViewInternalOperation viewNotifier =
+                new TestingSubpartitionViewInternalOperation();
+        HsSubpartitionFileReaderImpl subpartitionFileReader =
+                createSubpartitionFileReader(0, viewNotifier);
+
+        subpartitionFileReader.fail(new RuntimeException("expected exception."));
+
+        assertThatThrownBy(() -> subpartitionFileReader.peekNextToConsumeDataType(0))
+                .isInstanceOf(RuntimeException.class)
+                .hasMessageContaining("expected exception.");
+
+        assertThatThrownBy(() -> subpartitionFileReader.consumeBuffer(0))
+                .isInstanceOf(RuntimeException.class)
+                .hasMessageContaining("expected exception.");
+    }
+
+    @Test
+    void testPeekNextToConsumeDataType() throws Throwable {
+        TestingSubpartitionViewInternalOperation viewNotifier =
+                new TestingSubpartitionViewInternalOperation();
+        HsSubpartitionFileReaderImpl subpartitionFileReader =
+                createSubpartitionFileReader(0, viewNotifier);
+
+        // if no preload data in file reader, return DataType.NONE.
+        assertThat(subpartitionFileReader.peekNextToConsumeDataType(0)).isEqualTo(DataType.NONE);
+
+        // buffers in file: (0-0, 0-1)
+        writeDataToFile(0, 0, 2);
+
+        Queue<MemorySegment> memorySegments = createsMemorySegments(2);
+        // trigger reading, add buffer to queue.
+        subpartitionFileReader.readBuffers(memorySegments, (ignore) -> {});
+
+        // if nextBufferToConsume is not equal to peek elements index, return DataType.NONE.
+        assertThat(subpartitionFileReader.peekNextToConsumeDataType(10)).isEqualTo(DataType.NONE);
+
+        // if nextBufferToConsume is equal to peek elements index, return the real DataType.
+        assertThat(subpartitionFileReader.peekNextToConsumeDataType(0))
+                .isEqualTo(DataType.DATA_BUFFER);
+    }
+
     private static void checkData(HsSubpartitionFileReaderImpl fileReader, int... expectedData) {
         assertThat(fileReader.getLoadedBuffers()).hasSameSizeAs(expectedData);
         for (int data : expectedData) {
@@ -383,7 +464,12 @@ class HsSubpartitionFileReaderImplTest {
     private HsSubpartitionFileReaderImpl createSubpartitionFileReader(
             int targetChannel, HsSubpartitionViewInternalOperations operations) {
         return new HsSubpartitionFileReaderImpl(
-                targetChannel, dataFileChannel, operations, diskIndex, MAX_BUFFERS_READ_AHEAD);
+                targetChannel,
+                dataFileChannel,
+                operations,
+                diskIndex,
+                MAX_BUFFERS_READ_AHEAD,
+                (ignore) -> {});
     }
 
     private static FileChannel openFileChannel(Path path) throws IOException {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManagerTest.java
index a15dfddc9f2..f1b16d0d231 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManagerTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.Buffer.DataType;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
-import org.apache.flink.runtime.io.network.partition.hybrid.HsMemoryDataManager.BufferAndNextDataType;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
 import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider.ConsumeStatus;
 import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider.SpillStatus;
 
@@ -372,14 +372,14 @@ class HsSubpartitionMemoryDataManagerTest {
 
     private static void checkConsumedBufferAndNextDataType(
             List<Tuple2<Integer, Buffer.DataType>> expectedRecords,
-            List<Optional<BufferAndNextDataType>> bufferAndNextDataTypesOpt) {
-        checkArgument(expectedRecords.size() == bufferAndNextDataTypesOpt.size());
-        for (int i = 0; i < bufferAndNextDataTypesOpt.size(); i++) {
+            List<Optional<BufferAndBacklog>> bufferAndBacklogOpt) {
+        checkArgument(expectedRecords.size() == bufferAndBacklogOpt.size());
+        for (int i = 0; i < bufferAndBacklogOpt.size(); i++) {
             final int index = i;
-            assertThat(bufferAndNextDataTypesOpt.get(index))
+            assertThat(bufferAndBacklogOpt.get(index))
                     .hasValueSatisfying(
-                            (bufferAndNextDataType -> {
-                                Buffer buffer = bufferAndNextDataType.getBuffer();
+                            (bufferAndBacklog -> {
+                                Buffer buffer = bufferAndBacklog.buffer();
                                 int value =
                                         buffer.getNioBufferReadable()
                                                 .order(ByteOrder.LITTLE_ENDIAN)
@@ -387,11 +387,11 @@ class HsSubpartitionMemoryDataManagerTest {
                                 Buffer.DataType dataType = buffer.getDataType();
                                 assertThat(value).isEqualTo(expectedRecords.get(index).f0);
                                 assertThat(dataType).isEqualTo(expectedRecords.get(index).f1);
-                                if (index != bufferAndNextDataTypesOpt.size() - 1) {
-                                    assertThat(bufferAndNextDataType.getNextDataType())
+                                if (index != bufferAndBacklogOpt.size() - 1) {
+                                    assertThat(bufferAndBacklog.getNextDataType())
                                             .isEqualTo(expectedRecords.get(index + 1).f1);
                                 } else {
-                                    assertThat(bufferAndNextDataType.getNextDataType())
+                                    assertThat(bufferAndBacklog.getNextDataType())
                                             .isEqualTo(Buffer.DataType.NONE);
                                 }
                             }));
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java
new file mode 100644
index 00000000000..77a797a80f1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.Buffer.DataType;
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView.AvailabilityWithBacklog;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link HsSubpartitionView}. */
+class HsSubpartitionViewTest {
+    @Test
+    void testGetNextBufferFromDisk() {
+        HsSubpartitionView subpartitionView = createSubpartitionView();
+
+        BufferAndBacklog bufferAndBacklog = createBufferAndBacklog(0, DataType.DATA_BUFFER, 0);
+        CompletableFuture<Void> consumeBufferFromMemoryFuture = new CompletableFuture<>();
+        TestingHsDataView diskDataView =
+                TestingHsDataView.builder()
+                        .setConsumeBufferFunction(
+                                (bufferToConsume) -> Optional.of(bufferAndBacklog))
+                        .build();
+        TestingHsDataView memoryDataView =
+                TestingHsDataView.builder()
+                        .setConsumeBufferFunction(
+                                (ignore) -> {
+                                    consumeBufferFromMemoryFuture.complete(null);
+                                    return Optional.empty();
+                                })
+                        .build();
+        subpartitionView.setDiskDataView(diskDataView);
+        subpartitionView.setMemoryDataView(memoryDataView);
+
+        BufferAndBacklog nextBuffer = subpartitionView.getNextBuffer();
+        assertThat(consumeBufferFromMemoryFuture).isNotCompleted();
+        assertThat(nextBuffer).isSameAs(bufferAndBacklog);
+    }
+
+    @Test
+    void testGetNextBufferFromDiskNextDataTypeIsNone() {
+        HsSubpartitionView subpartitionView = createSubpartitionView();
+        BufferAndBacklog bufferAndBacklog = createBufferAndBacklog(0, DataType.NONE, 0);
+
+        TestingHsDataView diskDataView =
+                TestingHsDataView.builder()
+                        .setConsumeBufferFunction(
+                                (bufferToConsume) -> Optional.of(bufferAndBacklog))
+                        .build();
+
+        TestingHsDataView memoryDataView =
+                TestingHsDataView.builder()
+                        .setPeekNextToConsumeDataTypeFunction(
+                                (bufferToConsume) -> {
+                                    assertThat(bufferToConsume).isEqualTo(1);
+                                    return DataType.EVENT_BUFFER;
+                                })
+                        .build();
+        subpartitionView.setDiskDataView(diskDataView);
+        subpartitionView.setMemoryDataView(memoryDataView);
+
+        BufferAndBacklog nextBuffer = subpartitionView.getNextBuffer();
+        assertThat(nextBuffer).isNotNull();
+        assertThat(nextBuffer.buffer()).isSameAs(bufferAndBacklog.buffer());
+        assertThat(nextBuffer.buffersInBacklog()).isEqualTo(bufferAndBacklog.buffersInBacklog());
+        assertThat(nextBuffer.getSequenceNumber()).isEqualTo(bufferAndBacklog.getSequenceNumber());
+        assertThat(nextBuffer.getNextDataType()).isEqualTo(DataType.EVENT_BUFFER);
+    }
+
+    @Test
+    void testGetNextBufferFromMemory() {
+        HsSubpartitionView subpartitionView = createSubpartitionView();
+
+        BufferAndBacklog bufferAndBacklog = createBufferAndBacklog(0, DataType.DATA_BUFFER, 0);
+        TestingHsDataView memoryDataView =
+                TestingHsDataView.builder()
+                        .setConsumeBufferFunction(
+                                (bufferToConsume) -> Optional.of(bufferAndBacklog))
+                        .build();
+        TestingHsDataView diskDataView =
+                TestingHsDataView.builder()
+                        .setConsumeBufferFunction((bufferToConsume) -> Optional.empty())
+                        .build();
+        subpartitionView.setDiskDataView(diskDataView);
+        subpartitionView.setMemoryDataView(memoryDataView);
+
+        BufferAndBacklog nextBuffer = subpartitionView.getNextBuffer();
+        assertThat(nextBuffer).isSameAs(bufferAndBacklog);
+    }
+
+    @Test
+    void testGetNextBufferThrowException() {
+        HsSubpartitionView subpartitionView = createSubpartitionView();
+
+        TestingHsDataView diskDataView =
+                TestingHsDataView.builder()
+                        .setConsumeBufferFunction(
+                                (nextToConsume) -> {
+                                    throw new RuntimeException("expected exception.");
+                                })
+                        .build();
+        subpartitionView.setDiskDataView(diskDataView);
+        subpartitionView.setMemoryDataView(TestingHsDataView.NO_OP);
+
+        subpartitionView.getNextBuffer();
+        assertThat(subpartitionView.getFailureCause())
+                .isInstanceOf(RuntimeException.class)
+                .hasMessageContaining("expected exception.");
+        assertThat(subpartitionView.isReleased()).isTrue();
+    }
+
+    @Test
+    void testNotifyDataAvailableNeedNotify() {
+        CompletableFuture<Void> notifyAvailableFuture = new CompletableFuture<>();
+        HsSubpartitionView subpartitionView =
+                createSubpartitionView(() -> notifyAvailableFuture.complete(null));
+
+        TestingHsDataView memoryDataView =
+                TestingHsDataView.builder()
+                        .setConsumeBufferFunction(
+                                (bufferToConsume) ->
+                                        Optional.of(createBufferAndBacklog(0, DataType.NONE, 0)))
+                        .build();
+        subpartitionView.setMemoryDataView(memoryDataView);
+        subpartitionView.setDiskDataView(TestingHsDataView.NO_OP);
+
+        subpartitionView.getNextBuffer();
+        subpartitionView.notifyDataAvailable();
+        assertThat(notifyAvailableFuture).isCompleted();
+    }
+
+    @Test
+    void testNotifyDataAvailableNotNeedNotify() {
+        CompletableFuture<Void> notifyAvailableFuture = new CompletableFuture<>();
+        HsSubpartitionView subpartitionView =
+                createSubpartitionView(() -> notifyAvailableFuture.complete(null));
+
+        TestingHsDataView memoryDataView =
+                TestingHsDataView.builder()
+                        .setConsumeBufferFunction(
+                                (bufferToConsume) ->
+                                        Optional.of(
+                                                createBufferAndBacklog(0, DataType.DATA_BUFFER, 0)))
+                        .build();
+        subpartitionView.setMemoryDataView(memoryDataView);
+        subpartitionView.setDiskDataView(TestingHsDataView.NO_OP);
+
+        subpartitionView.getNextBuffer();
+        subpartitionView.notifyDataAvailable();
+        assertThat(notifyAvailableFuture).isNotCompleted();
+    }
+
+    @Test
+    void testGetAvailabilityAndBacklogPositiveCredit() {
+        HsSubpartitionView subpartitionView = createSubpartitionView();
+        subpartitionView.setMemoryDataView(TestingHsDataView.NO_OP);
+
+        final int backlog = 2;
+        subpartitionView.setDiskDataView(
+                TestingHsDataView.builder().setGetBacklogSupplier(() -> backlog).build());
+        AvailabilityWithBacklog availabilityAndBacklog =
+                subpartitionView.getAvailabilityAndBacklog(1);
+        assertThat(availabilityAndBacklog.getBacklog()).isEqualTo(backlog);
+        // positive credit always available.
+        assertThat(availabilityAndBacklog.isAvailable()).isTrue();
+    }
+
+    @Test
+    void testGetAvailabilityAndBacklogNonPositiveCreditNextIsData() {
+        final int backlog = 2;
+
+        HsSubpartitionView subpartitionView = createSubpartitionView();
+        subpartitionView.setMemoryDataView(
+                TestingHsDataView.builder()
+                        .setConsumeBufferFunction(
+                                (nextToConsume) ->
+                                        Optional.of(
+                                                createBufferAndBacklog(
+                                                        backlog, DataType.DATA_BUFFER, 0)))
+                        .build());
+        subpartitionView.setDiskDataView(
+                TestingHsDataView.builder().setGetBacklogSupplier(() -> backlog).build());
+
+        subpartitionView.getNextBuffer();
+
+        AvailabilityWithBacklog availabilityAndBacklog =
+                subpartitionView.getAvailabilityAndBacklog(0);
+        assertThat(availabilityAndBacklog.getBacklog()).isEqualTo(backlog);
+        // if credit is non-positive, only event can be available.
+        assertThat(availabilityAndBacklog.isAvailable()).isFalse();
+    }
+
+    @Test
+    void testGetAvailabilityAndBacklogNonPositiveCreditNextIsEvent() {
+        final int backlog = 2;
+
+        HsSubpartitionView subpartitionView = createSubpartitionView();
+        subpartitionView.setMemoryDataView(
+                TestingHsDataView.builder()
+                        .setConsumeBufferFunction(
+                                (nextToConsume) ->
+                                        Optional.of(
+                                                createBufferAndBacklog(
+                                                        backlog, DataType.EVENT_BUFFER, 0)))
+                        .build());
+        subpartitionView.setDiskDataView(
+                TestingHsDataView.builder().setGetBacklogSupplier(() -> backlog).build());
+
+        subpartitionView.getNextBuffer();
+
+        AvailabilityWithBacklog availabilityAndBacklog =
+                subpartitionView.getAvailabilityAndBacklog(0);
+        assertThat(availabilityAndBacklog.getBacklog()).isEqualTo(backlog);
+        // if credit is non-positive, only event can be available.
+        assertThat(availabilityAndBacklog.isAvailable()).isTrue();
+    }
+
+    @Test
+    void testRelease() throws Exception {
+        HsSubpartitionView subpartitionView = createSubpartitionView();
+        CompletableFuture<Void> releaseDataViewFuture = new CompletableFuture<>();
+        TestingHsDataView diskDataView =
+                TestingHsDataView.builder()
+                        .setReleaseDataViewRunnable(() -> releaseDataViewFuture.complete(null))
+                        .build();
+        subpartitionView.setDiskDataView(diskDataView);
+        subpartitionView.setMemoryDataView(TestingHsDataView.NO_OP);
+        subpartitionView.releaseAllResources();
+        assertThat(subpartitionView.isReleased()).isTrue();
+        assertThat(releaseDataViewFuture).isCompleted();
+    }
+
+    @Test
+    void testGetConsumingOffset() {
+        AtomicInteger nextBufferIndex = new AtomicInteger(0);
+        HsSubpartitionView subpartitionView = createSubpartitionView();
+        TestingHsDataView diskDataView =
+                TestingHsDataView.builder()
+                        .setConsumeBufferFunction(
+                                (toConsumeBuffer) ->
+                                        Optional.of(
+                                                createBufferAndBacklog(
+                                                        0,
+                                                        DataType.DATA_BUFFER,
+                                                        nextBufferIndex.getAndIncrement())))
+                        .build();
+        subpartitionView.setDiskDataView(diskDataView);
+        subpartitionView.setMemoryDataView(TestingHsDataView.NO_OP);
+
+        assertThat(subpartitionView.getConsumingOffset()).isEqualTo(-1);
+        subpartitionView.getNextBuffer();
+        assertThat(subpartitionView.getConsumingOffset()).isEqualTo(0);
+        subpartitionView.getNextBuffer();
+        assertThat(subpartitionView.getConsumingOffset()).isEqualTo(1);
+    }
+
+    @Test
+    void testSetDataViewRepeatedly() {
+        HsSubpartitionView subpartitionView = createSubpartitionView();
+
+        subpartitionView.setMemoryDataView(TestingHsDataView.NO_OP);
+        assertThatThrownBy(() -> subpartitionView.setMemoryDataView(TestingHsDataView.NO_OP))
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining("repeatedly set memory data view is not allowed.");
+
+        subpartitionView.setDiskDataView(TestingHsDataView.NO_OP);
+        assertThatThrownBy(() -> subpartitionView.setDiskDataView(TestingHsDataView.NO_OP))
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining("repeatedly set disk data view is not allowed.");
+    }
+
+    private static HsSubpartitionView createSubpartitionView() {
+        return new HsSubpartitionView(new NoOpBufferAvailablityListener());
+    }
+
+    private static HsSubpartitionView createSubpartitionView(
+            BufferAvailabilityListener bufferAvailabilityListener) {
+        return new HsSubpartitionView(bufferAvailabilityListener);
+    }
+
+    private static BufferAndBacklog createBufferAndBacklog(
+            int buffersInBacklog, DataType nextDataType, int sequenceNumber) {
+        final int bufferSize = 8;
+        Buffer buffer = HybridShuffleTestUtils.createBuffer(bufferSize, true);
+        return new BufferAndBacklog(buffer, buffersInBacklog, nextDataType, sequenceNumber);
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingHsDataView.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingHsDataView.java
new file mode 100644
index 00000000000..343bd9586df
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingHsDataView.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
+import org.apache.flink.util.function.FunctionWithException;
+
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/** Mock {@link HsDataView} for testing. */
+public class TestingHsDataView implements HsDataView {
+    public static final TestingHsDataView NO_OP = TestingHsDataView.builder().build();
+
+    private final FunctionWithException<
+                    Integer, Optional<ResultSubpartition.BufferAndBacklog>, Throwable>
+            consumeBufferFunction;
+
+    private final Function<Integer, Buffer.DataType> peekNextToConsumeDataTypeFunction;
+
+    private final Supplier<Integer> getBacklogSupplier;
+
+    private final Runnable releaseDataViewRunnable;
+
+    private TestingHsDataView(
+            FunctionWithException<Integer, Optional<ResultSubpartition.BufferAndBacklog>, Throwable>
+                    consumeBufferFunction,
+            Function<Integer, Buffer.DataType> peekNextToConsumeDataTypeFunction,
+            Supplier<Integer> getBacklogSupplier,
+            Runnable releaseDataViewRunnable) {
+        this.consumeBufferFunction = consumeBufferFunction;
+        this.peekNextToConsumeDataTypeFunction = peekNextToConsumeDataTypeFunction;
+        this.getBacklogSupplier = getBacklogSupplier;
+        this.releaseDataViewRunnable = releaseDataViewRunnable;
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    @Override
+    public Optional<ResultSubpartition.BufferAndBacklog> consumeBuffer(int nextBufferToConsume)
+            throws Throwable {
+        return consumeBufferFunction.apply(nextBufferToConsume);
+    }
+
+    @Override
+    public Buffer.DataType peekNextToConsumeDataType(int nextBufferToConsume) {
+        return peekNextToConsumeDataTypeFunction.apply(nextBufferToConsume);
+    }
+
+    @Override
+    public int getBacklog() {
+        return getBacklogSupplier.get();
+    }
+
+    @Override
+    public void releaseDataView() {
+        releaseDataViewRunnable.run();
+    }
+
+    /** Builder for {@link TestingHsDataView}. */
+    public static class Builder {
+        private FunctionWithException<
+                        Integer, Optional<ResultSubpartition.BufferAndBacklog>, Throwable>
+                consumeBufferFunction = (ignore) -> Optional.empty();
+
+        private Function<Integer, Buffer.DataType> peekNextToConsumeDataTypeFunction =
+                (ignore) -> Buffer.DataType.NONE;
+
+        private Supplier<Integer> getBacklogSupplier = () -> 0;
+
+        private Runnable releaseDataViewRunnable = () -> {};
+
+        private Builder() {}
+
+        public Builder setConsumeBufferFunction(
+                FunctionWithException<
+                                Integer, Optional<ResultSubpartition.BufferAndBacklog>, Throwable>
+                        consumeBufferFunction) {
+            this.consumeBufferFunction = consumeBufferFunction;
+            return this;
+        }
+
+        public Builder setPeekNextToConsumeDataTypeFunction(
+                Function<Integer, Buffer.DataType> peekNextToConsumeDataTypeFunction) {
+            this.peekNextToConsumeDataTypeFunction = peekNextToConsumeDataTypeFunction;
+            return this;
+        }
+
+        public Builder setGetBacklogSupplier(Supplier<Integer> getBacklogSupplier) {
+            this.getBacklogSupplier = getBacklogSupplier;
+            return this;
+        }
+
+        public Builder setReleaseDataViewRunnable(Runnable releaseDataViewRunnable) {
+            this.releaseDataViewRunnable = releaseDataViewRunnable;
+            return this;
+        }
+
+        public TestingHsDataView build() {
+            return new TestingHsDataView(
+                    consumeBufferFunction,
+                    peekNextToConsumeDataTypeFunction,
+                    getBacklogSupplier,
+                    releaseDataViewRunnable);
+        }
+    }
+}


[flink] 09/13: [FLINK-27908] HybridShuffleConfiguration supports set spilling strategy type.

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e2d50c4777d2d5ae8e6f5857427832b7460d3893
Author: Weijie Guo <re...@163.com>
AuthorDate: Wed Jul 27 23:00:45 2022 +0800

    [FLINK-27908] HybridShuffleConfiguration supports set spilling strategy type.
---
 .../hybrid/HybridShuffleConfiguration.java         | 32 ++++++++++++++++++++--
 1 file changed, 29 insertions(+), 3 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleConfiguration.java
index 144e2ccea5a..18cadb71b05 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleConfiguration.java
@@ -36,12 +36,17 @@ public class HybridShuffleConfiguration {
 
     private static final float DEFAULT_FULL_STRATEGY_RELEASE_BUFFER_RATIO = 0.4f;
 
+    private static final SpillingStrategyType DEFAULT_SPILLING_STRATEGY_NAME =
+            SpillingStrategyType.FULL;
+
     private final int maxBuffersReadAhead;
 
     private final Duration bufferRequestTimeout;
 
     private final int maxRequestedBuffers;
 
+    private final SpillingStrategyType spillingStrategyType;
+
     // ----------------------------------------
     //        Selective Spilling Strategy
     // ----------------------------------------
@@ -66,7 +71,8 @@ public class HybridShuffleConfiguration {
             float selectiveStrategySpillBufferRatio,
             int fullStrategyNumBuffersTriggerSpilling,
             float fullStrategyReleaseThreshold,
-            float fullStrategyReleaseBufferRatio) {
+            float fullStrategyReleaseBufferRatio,
+            SpillingStrategyType spillingStrategyType) {
         this.maxBuffersReadAhead = maxBuffersReadAhead;
         this.bufferRequestTimeout = bufferRequestTimeout;
         this.maxRequestedBuffers = maxRequestedBuffers;
@@ -75,12 +81,18 @@ public class HybridShuffleConfiguration {
         this.fullStrategyNumBuffersTriggerSpilling = fullStrategyNumBuffersTriggerSpilling;
         this.fullStrategyReleaseThreshold = fullStrategyReleaseThreshold;
         this.fullStrategyReleaseBufferRatio = fullStrategyReleaseBufferRatio;
+        this.spillingStrategyType = spillingStrategyType;
     }
 
     public static Builder builder(int numSubpartitions, int numBuffersPerRequest) {
         return new Builder(numSubpartitions, numBuffersPerRequest);
     }
 
+    /** Get {@link SpillingStrategyType} for hybrid shuffle mode. */
+    public SpillingStrategyType getSpillingStrategyType() {
+        return spillingStrategyType;
+    }
+
     public int getMaxRequestedBuffers() {
         return maxRequestedBuffers;
     }
@@ -135,8 +147,14 @@ public class HybridShuffleConfiguration {
         return fullStrategyReleaseBufferRatio;
     }
 
+    /** Type of {@link HsSpillingStrategy}. */
+    public enum SpillingStrategyType {
+        FULL,
+        SELECTIVE
+    }
+
     /** Builder for {@link HybridShuffleConfiguration}. */
-    static class Builder {
+    public static class Builder {
         private int maxBuffersReadAhead = DEFAULT_MAX_BUFFERS_READ_AHEAD;
 
         private Duration bufferRequestTimeout = DEFAULT_BUFFER_REQUEST_TIMEOUT;
@@ -153,6 +171,8 @@ public class HybridShuffleConfiguration {
 
         private float fullStrategyReleaseBufferRatio = DEFAULT_FULL_STRATEGY_RELEASE_BUFFER_RATIO;
 
+        private SpillingStrategyType spillingStrategyType = DEFAULT_SPILLING_STRATEGY_NAME;
+
         private final int numSubpartitions;
 
         private final int numBuffersPerRequest;
@@ -199,6 +219,11 @@ public class HybridShuffleConfiguration {
             return this;
         }
 
+        public Builder setSpillingStrategyType(SpillingStrategyType spillingStrategyType) {
+            this.spillingStrategyType = spillingStrategyType;
+            return this;
+        }
+
         public HybridShuffleConfiguration build() {
             return new HybridShuffleConfiguration(
                     maxBuffersReadAhead,
@@ -208,7 +233,8 @@ public class HybridShuffleConfiguration {
                     selectiveStrategySpillBufferRatio,
                     fullStrategyNumBuffersTriggerSpilling,
                     fullStrategyReleaseThreshold,
-                    fullStrategyReleaseBufferRatio);
+                    fullStrategyReleaseBufferRatio,
+                    spillingStrategyType);
         }
     }
 }


[flink] 06/13: [FLINK-27908] ResultPartition's subclass using setupInternal instead of setup to do initialization work.

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e930077b9057aae6f634e8638f38949299881887
Author: Weijie Guo <re...@163.com>
AuthorDate: Thu Jul 28 16:54:04 2022 +0800

    [FLINK-27908] ResultPartition's subclass using setupInternal instead of setup to do initialization work.
---
 .../runtime/io/network/partition/BufferWritingResultPartition.java    | 4 +---
 .../apache/flink/runtime/io/network/partition/ResultPartition.java    | 4 ++++
 .../flink/runtime/io/network/partition/SortMergeResultPartition.java  | 4 +---
 3 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
index 2422f3ad808..8b36a7587d1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
@@ -94,9 +94,7 @@ public abstract class BufferWritingResultPartition extends ResultPartition {
     }
 
     @Override
-    public void setup() throws IOException {
-        super.setup();
-
+    protected void setupInternal() throws IOException {
         checkState(
                 bufferPool.getNumberOfRequiredMemorySegments() >= getNumberOfSubpartitions(),
                 "Bug in result partition setup logic: Buffer pool has not enough guaranteed buffers for"
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index ccdf8f4a138..e3b2e2ff74e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -158,9 +158,13 @@ public abstract class ResultPartition implements ResultPartitionWriter {
                 "Bug in result partition setup logic: Already registered buffer pool.");
 
         this.bufferPool = checkNotNull(bufferPoolFactory.get());
+        setupInternal();
         partitionManager.registerResultPartition(this);
     }
 
+    /** Do the subclass's own setup operation. */
+    protected abstract void setupInternal() throws IOException;
+
     public String getOwningTaskName() {
         return owningTaskName;
     }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
index 1620c36beea..a8c678db053 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
@@ -173,7 +173,7 @@ public class SortMergeResultPartition extends ResultPartition {
     }
 
     @Override
-    public void setup() throws IOException {
+    protected void setupInternal() throws IOException {
         synchronized (lock) {
             if (isReleased()) {
                 throw new IOException("Result partition has been released.");
@@ -189,8 +189,6 @@ public class SortMergeResultPartition extends ResultPartition {
 
         // initialize the buffer pool eagerly to avoid reporting errors such as OOM too late
         readBufferPool.initialize();
-        super.setup();
-
         LOG.info("Sort-merge partition {} initialized.", getPartitionId());
     }
 


[flink] 10/13: [FLINK-27908] Let HsMemoryDataManager can register HsSubpartitionViewInternalOperations and supports notifyDataAvailable.

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 347423f8464c530efd308c5ea7abb34a5808e0c9
Author: Weijie Guo <re...@163.com>
AuthorDate: Thu Jul 28 21:06:12 2022 +0800

    [FLINK-27908] Let HsMemoryDataManager can register HsSubpartitionViewInternalOperations and supports notifyDataAvailable.
---
 .../partition/hybrid/HsMemoryDataManager.java      | 44 ++++++++++++++++++++--
 .../hybrid/HsMemoryDataManagerOperation.java       |  7 ++++
 .../hybrid/HsSubpartitionFileReaderImpl.java       |  4 +-
 .../hybrid/HsSubpartitionMemoryDataManager.java    |  5 +--
 .../HsSubpartitionViewInternalOperations.java      |  4 +-
 .../hybrid/TestingMemoryDataManagerOperation.java  | 21 ++++++++++-
 .../TestingSubpartitionViewInternalOperation.java  |  2 +-
 7 files changed, 74 insertions(+), 13 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
index 7acc9d0bdbb..9bf57b51c4b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
@@ -26,16 +26,19 @@ import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategy.D
 import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.function.SupplierWithException;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.file.Path;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Deque;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -44,6 +47,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 /** This class is responsible for managing data in memory. */
 public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryDataManagerOperation {
 
+    private static final Logger LOG = LoggerFactory.getLogger(HsMemoryDataManager.class);
+
     private final int numSubpartitions;
 
     private final HsSubpartitionMemoryDataManager[] subpartitionMemoryDataManagers;
@@ -62,6 +67,9 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData
 
     private final AtomicInteger numUnSpillBuffers = new AtomicInteger(0);
 
+    private final Map<Integer, HsSubpartitionViewInternalOperations> subpartitionViewOperationsMap =
+            new ConcurrentHashMap<>();
+
     public HsMemoryDataManager(
             int numSubpartitions,
             int bufferSize,
@@ -108,6 +116,22 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData
         }
     }
 
+    /**
+     * Register {@link HsSubpartitionViewInternalOperations} to {@link
+     * #subpartitionViewOperationsMap}. It is used to obtain the consumption progress of the
+     * subpartition.
+     */
+    public void registerSubpartitionView(
+            int subpartitionId, HsSubpartitionViewInternalOperations viewOperations) {
+        HsSubpartitionViewInternalOperations oldView =
+                subpartitionViewOperationsMap.put(subpartitionId, viewOperations);
+        if (oldView != null) {
+            LOG.debug(
+                    "subpartition : {} register subpartition view will replace old view. ",
+                    subpartitionId);
+        }
+    }
+
     /** Close this {@link HsMemoryDataManager}, it means no data can append to memory. */
     public void close() {
         Decision decision = callWithLock(() -> spillStrategy.onResultPartitionClosed(this));
@@ -159,8 +183,13 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData
     // Write lock should be acquired before invoke this method.
     @Override
     public List<Integer> getNextBufferIndexToConsume() {
-        // TODO implements this logical when subpartition view is implemented.
-        return Collections.emptyList();
+        ArrayList<Integer> consumeIndexes = new ArrayList<>(numSubpartitions);
+        for (int channel = 0; channel < numSubpartitions; channel++) {
+            HsSubpartitionViewInternalOperations viewOperation =
+                    subpartitionViewOperationsMap.get(channel);
+            consumeIndexes.add(viewOperation == null ? -1 : viewOperation.getConsumingOffset() + 1);
+        }
+        return consumeIndexes;
     }
 
     // ------------------------------------
@@ -196,6 +225,15 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData
         handleDecision(decision);
     }
 
+    @Override
+    public void onDataAvailable(int subpartitionId) {
+        HsSubpartitionViewInternalOperations subpartitionViewInternalOperations =
+                subpartitionViewOperationsMap.get(subpartitionId);
+        if (subpartitionViewInternalOperations != null) {
+            subpartitionViewInternalOperations.notifyDataAvailable();
+        }
+    }
+
     // ------------------------------------
     //           Internal Method
     // ------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerOperation.java
index d34b251a700..ca86a1ae913 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerOperation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerOperation.java
@@ -49,4 +49,11 @@ public interface HsMemoryDataManagerOperation {
 
     /** This method is called when buffer is finished. */
     void onBufferFinished();
+
+    /**
+     * This method is called when subpartition data become available.
+     *
+     * @param subpartitionId the subpartition need notify data available.
+     */
+    void onDataAvailable(int subpartitionId);
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
index 2f29f2409ab..ecf02260dd1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
@@ -152,7 +152,7 @@ public class HsSubpartitionFileReaderImpl implements HsSubpartitionFileReader {
         }
 
         if (loadedBuffers.size() <= numLoaded) {
-            operations.notifyDataAvailableFromDisk();
+            operations.notifyDataAvailable();
         }
     }
 
@@ -171,7 +171,7 @@ public class HsSubpartitionFileReaderImpl implements HsSubpartitionFileReader {
         }
 
         loadedBuffers.add(BufferIndexOrError.newError(failureCause));
-        operations.notifyDataAvailableFromDisk();
+        operations.notifyDataAvailable();
     }
 
     /** Refresh downstream consumption progress for another round scheduling of reading. */
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java
index 8084a8883f3..b5079e7df21 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java
@@ -351,11 +351,10 @@ public class HsSubpartitionMemoryDataManager {
                                     bufferContext.getBufferIndexAndChannel().getBufferIndex(),
                                     bufferContext);
                             trimHeadingReleasedBuffers(unConsumedBuffers);
-                            return unConsumedBuffers.isEmpty();
+                            return unConsumedBuffers.size() <= 1;
                         });
         if (needNotify) {
-            // TODO notify data available, the notification mechanism may need further
-            // consideration.
+            memoryDataManagerOperation.onDataAvailable(targetChannel);
         }
     }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewInternalOperations.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewInternalOperations.java
index 1cd6f97ac83..053e5a291ba 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewInternalOperations.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewInternalOperations.java
@@ -24,8 +24,8 @@ package org.apache.flink.runtime.io.network.partition.hybrid;
  */
 public interface HsSubpartitionViewInternalOperations {
 
-    /** Callback for new data become available from disk. */
-    void notifyDataAvailableFromDisk();
+    /** Callback for new data become available. */
+    void notifyDataAvailable();
 
     /** Get the latest consuming offset of the subpartition. */
     int getConsumingOffset();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingMemoryDataManagerOperation.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingMemoryDataManagerOperation.java
index f78774ca674..bb441d667af 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingMemoryDataManagerOperation.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingMemoryDataManagerOperation.java
@@ -35,16 +35,20 @@ public class TestingMemoryDataManagerOperation implements HsMemoryDataManagerOpe
 
     private final Runnable onBufferFinishedRunnable;
 
+    private final Runnable onDataAvailableRunnable;
+
     private TestingMemoryDataManagerOperation(
             SupplierWithException<BufferBuilder, InterruptedException>
                     requestBufferFromPoolSupplier,
             BiConsumer<Integer, Integer> markBufferReadableConsumer,
             Consumer<BufferIndexAndChannel> onBufferConsumedConsumer,
-            Runnable onBufferFinishedRunnable) {
+            Runnable onBufferFinishedRunnable,
+            Runnable onDataAvailableRunnable) {
         this.requestBufferFromPoolSupplier = requestBufferFromPoolSupplier;
         this.markBufferReadableConsumer = markBufferReadableConsumer;
         this.onBufferConsumedConsumer = onBufferConsumedConsumer;
         this.onBufferFinishedRunnable = onBufferFinishedRunnable;
+        this.onDataAvailableRunnable = onDataAvailableRunnable;
     }
 
     @Override
@@ -67,6 +71,11 @@ public class TestingMemoryDataManagerOperation implements HsMemoryDataManagerOpe
         onBufferFinishedRunnable.run();
     }
 
+    @Override
+    public void onDataAvailable(int subpartitionId) {
+        onDataAvailableRunnable.run();
+    }
+
     public static Builder builder() {
         return new Builder();
     }
@@ -82,6 +91,8 @@ public class TestingMemoryDataManagerOperation implements HsMemoryDataManagerOpe
 
         private Runnable onBufferFinishedRunnable = () -> {};
 
+        private Runnable onDataAvailableRunnable = () -> {};
+
         public Builder setRequestBufferFromPoolSupplier(
                 SupplierWithException<BufferBuilder, InterruptedException>
                         requestBufferFromPoolSupplier) {
@@ -106,6 +117,11 @@ public class TestingMemoryDataManagerOperation implements HsMemoryDataManagerOpe
             return this;
         }
 
+        public Builder setOnDataAvailableRunnable(Runnable onDataAvailableRunnable) {
+            this.onDataAvailableRunnable = onDataAvailableRunnable;
+            return this;
+        }
+
         private Builder() {}
 
         public TestingMemoryDataManagerOperation build() {
@@ -113,7 +129,8 @@ public class TestingMemoryDataManagerOperation implements HsMemoryDataManagerOpe
                     requestBufferFromPoolSupplier,
                     markBufferReadableConsumer,
                     onBufferConsumedConsumer,
-                    onBufferFinishedRunnable);
+                    onBufferFinishedRunnable,
+                    onDataAvailableRunnable);
         }
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSubpartitionViewInternalOperation.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSubpartitionViewInternalOperation.java
index 9042d9d377d..7ac32be04f9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSubpartitionViewInternalOperation.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSubpartitionViewInternalOperation.java
@@ -27,7 +27,7 @@ public class TestingSubpartitionViewInternalOperation
     private Runnable notifyDataAvailableRunnable = () -> {};
 
     @Override
-    public void notifyDataAvailableFromDisk() {
+    public void notifyDataAvailable() {
         notifyDataAvailableRunnable.run();
     }
 


[flink] 03/13: [hotfix] Rename HsResultPartitionReadScheduler to HsFileDataManager Rename HsResultPartitionReadScheduler to HsFileDataManager as it plays the same role of FileDataManager mentioned in FLIP.

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9cde5322969d7c92a76d615576f1cdd52454b33e
Author: Weijie Guo <re...@163.com>
AuthorDate: Thu Jul 28 17:41:17 2022 +0800

    [hotfix] Rename HsResultPartitionReadScheduler to HsFileDataManager
     Rename HsResultPartitionReadScheduler to HsFileDataManager as it plays the same role of FileDataManager mentioned in FLIP.
---
 ...onReadScheduler.java => HsFileDataManager.java} | 20 ++++----
 ...hedulerTest.java => HsFileDataManagerTest.java} | 60 +++++++++++-----------
 2 files changed, 40 insertions(+), 40 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionReadScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
similarity index 95%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionReadScheduler.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
index ef742b61dee..dab671aa723 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionReadScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
@@ -53,12 +53,12 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
- * IO scheduler for HsResultPartition, which schedules {@link HsSubpartitionFileReaderImpl} for
+ * File data manager for HsResultPartition, which schedules {@link HsSubpartitionFileReaderImpl} for
  * loading data w.r.t. their offset in the file.
  */
 @ThreadSafe
-public class HsResultPartitionReadScheduler implements Runnable, BufferRecycler {
-    private static final Logger LOG = LoggerFactory.getLogger(HsResultPartitionReadScheduler.class);
+public class HsFileDataManager implements Runnable, BufferRecycler {
+    private static final Logger LOG = LoggerFactory.getLogger(HsFileDataManager.class);
 
     /** Executor to run the shuffle data reading task. */
     private final Executor ioExecutor;
@@ -76,8 +76,8 @@ public class HsResultPartitionReadScheduler implements Runnable, BufferRecycler
     private final Object lock = new Object();
 
     /**
-     * A {@link CompletableFuture} to be completed when this read scheduler including all resources
-     * is released.
+     * A {@link CompletableFuture} to be completed when this data manager including all resources is
+     * released.
      */
     @GuardedBy("lock")
     private final CompletableFuture<?> releaseFuture = new CompletableFuture<>();
@@ -112,14 +112,14 @@ public class HsResultPartitionReadScheduler implements Runnable, BufferRecycler
     @GuardedBy("lock")
     private volatile int numRequestedBuffers;
 
-    /** Whether this read scheduler has been released or not. */
+    /** Whether this file data manager has been released or not. */
     @GuardedBy("lock")
     private volatile boolean isReleased;
 
     @GuardedBy("lock")
     private FileChannel dataFileChannel;
 
-    public HsResultPartitionReadScheduler(
+    public HsFileDataManager(
             BatchShuffleReadBufferPool bufferPool,
             Executor ioExecutor,
             HsFileDataIndex dataIndex,
@@ -149,7 +149,7 @@ public class HsResultPartitionReadScheduler implements Runnable, BufferRecycler
     public HsSubpartitionFileReader registerNewSubpartition(
             int subpartitionId, HsSubpartitionViewInternalOperations operation) throws IOException {
         synchronized (lock) {
-            checkState(!isReleased, "HsResultPartitionReadScheduler is already released.");
+            checkState(!isReleased, "HsFileDataManager is already released.");
             lazyInitialize();
 
             HsSubpartitionFileReader subpartitionReader =
@@ -168,8 +168,8 @@ public class HsResultPartitionReadScheduler implements Runnable, BufferRecycler
     }
 
     /**
-     * Releases this read scheduler and returns a {@link CompletableFuture} which will be completed
-     * when all resources are released.
+     * Releases this file data manager and returns a {@link CompletableFuture} which will be
+     * completed when all resources are released.
      */
     public CompletableFuture<?> release() {
         synchronized (lock) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionReadSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java
similarity index 88%
rename from flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionReadSchedulerTest.java
rename to flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java
index c943665b3b0..1a39e14ca02 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionReadSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java
@@ -51,9 +51,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-/** Tests for {@link HsResultPartitionReadScheduler}. */
+/** Tests for {@link HsFileDataManager}. */
 @ExtendWith(TestLoggerExtension.class)
-class HsResultPartitionReadSchedulerTest {
+class HsFileDataManagerTest {
     private static final int BUFFER_SIZE = 1024;
 
     private static final int NUM_SUBPARTITIONS = 10;
@@ -70,7 +70,7 @@ class HsResultPartitionReadSchedulerTest {
 
     private Path dataFilePath;
 
-    private HsResultPartitionReadScheduler readScheduler;
+    private HsFileDataManager fileDataManager;
 
     private TestingSubpartitionViewInternalOperation subpartitionViewOperation;
 
@@ -85,8 +85,8 @@ class HsResultPartitionReadSchedulerTest {
         dataFilePath = Files.createFile(tempDir.resolve(".data"));
         dataFileChannel = openFileChannel(dataFilePath);
         factory = new TestingHsSubpartitionFileReader.Factory();
-        readScheduler =
-                new HsResultPartitionReadScheduler(
+        fileDataManager =
+                new HsFileDataManager(
                         bufferPool,
                         ioExecutor,
                         new HsFileDataIndexImpl(NUM_SUBPARTITIONS),
@@ -117,7 +117,7 @@ class HsResultPartitionReadSchedulerTest {
 
         assertThat(reader.readBuffers).isEmpty();
 
-        readScheduler.registerNewSubpartition(0, subpartitionViewOperation);
+        fileDataManager.registerNewSubpartition(0, subpartitionViewOperation);
 
         ioExecutor.trigger();
 
@@ -136,15 +136,15 @@ class HsResultPartitionReadSchedulerTest {
 
         factory.allReaders.add(reader);
 
-        readScheduler.registerNewSubpartition(0, subpartitionViewOperation);
+        fileDataManager.registerNewSubpartition(0, subpartitionViewOperation);
 
         ioExecutor.trigger();
 
         assertThat(reader.readBuffers).hasSize(BUFFER_POOL_SIZE);
         assertThat(bufferPool.getAvailableBuffers()).isZero();
 
-        readScheduler.recycle(reader.readBuffers.poll());
-        readScheduler.recycle(reader.readBuffers.poll());
+        fileDataManager.recycle(reader.readBuffers.poll());
+        fileDataManager.recycle(reader.readBuffers.poll());
 
         // recycle buffer will push new runnable to ioExecutor.
         ioExecutor.trigger();
@@ -163,12 +163,12 @@ class HsResultPartitionReadSchedulerTest {
                     assertThat(prepareForSchedulingFinished).isCompleted();
                     assertThat(requestedBuffers).hasSize(BUFFER_POOL_SIZE);
                     assertThat(bufferPool.getAvailableBuffers()).isEqualTo(0);
-                    // read one buffer, return another buffer to scheduler.
+                    // read one buffer, return another buffer to data manager.
                     readBuffers.add(requestedBuffers.poll());
                 });
         factory.allReaders.add(reader);
 
-        readScheduler.registerNewSubpartition(0, subpartitionViewOperation);
+        fileDataManager.registerNewSubpartition(0, subpartitionViewOperation);
 
         ioExecutor.trigger();
 
@@ -176,7 +176,7 @@ class HsResultPartitionReadSchedulerTest {
         assertThat(bufferPool.getAvailableBuffers()).isEqualTo(1);
     }
 
-    /** Test scheduler will schedule readers in order. */
+    /** Test file data manager will schedule readers in order. */
     @Test
     void testScheduleReadersOrdered() throws Exception {
         TestingHsSubpartitionFileReader reader1 = new TestingHsSubpartitionFileReader();
@@ -201,8 +201,8 @@ class HsResultPartitionReadSchedulerTest {
         factory.allReaders.add(reader1);
         factory.allReaders.add(reader2);
 
-        readScheduler.registerNewSubpartition(0, subpartitionViewOperation);
-        readScheduler.registerNewSubpartition(1, subpartitionViewOperation);
+        fileDataManager.registerNewSubpartition(0, subpartitionViewOperation);
+        fileDataManager.registerNewSubpartition(1, subpartitionViewOperation);
 
         // trigger run.
         ioExecutor.trigger();
@@ -218,8 +218,8 @@ class HsResultPartitionReadSchedulerTest {
         bufferPool.requestBuffers();
         assertThat(bufferPool.getAvailableBuffers()).isZero();
 
-        readScheduler =
-                new HsResultPartitionReadScheduler(
+        fileDataManager =
+                new HsFileDataManager(
                         bufferPool,
                         ioExecutor,
                         new HsFileDataIndexImpl(NUM_SUBPARTITIONS),
@@ -237,7 +237,7 @@ class HsResultPartitionReadSchedulerTest {
         reader.setFailConsumer((cause::complete));
         factory.allReaders.add(reader);
 
-        readScheduler.registerNewSubpartition(0, subpartitionViewOperation);
+        fileDataManager.registerNewSubpartition(0, subpartitionViewOperation);
 
         ioExecutor.trigger();
 
@@ -263,7 +263,7 @@ class HsResultPartitionReadSchedulerTest {
                 });
         factory.allReaders.add(reader);
 
-        readScheduler.registerNewSubpartition(0, subpartitionViewOperation);
+        fileDataManager.registerNewSubpartition(0, subpartitionViewOperation);
 
         ioExecutor.trigger();
 
@@ -275,7 +275,7 @@ class HsResultPartitionReadSchedulerTest {
 
     // ----------------------- test release ---------------------------------------
 
-    /** Test scheduler release when reader is reading buffers. */
+    /** Test file data manager release when reader is reading buffers. */
     @Test
     @Timeout(10)
     void testReleasedWhenReading() throws Exception {
@@ -284,14 +284,14 @@ class HsResultPartitionReadSchedulerTest {
         CompletableFuture<Throwable> cause = new CompletableFuture<>();
         reader.setFailConsumer((cause::complete));
         CompletableFuture<Void> readBufferStart = new CompletableFuture<>();
-        CompletableFuture<Void> schedulerReleasedFinish = new CompletableFuture<>();
+        CompletableFuture<Void> releasedFinish = new CompletableFuture<>();
         reader.setReadBuffersConsumer(
                 (requestedBuffers, readBuffers) -> {
                     try {
                         readBufferStart.complete(null);
-                        schedulerReleasedFinish.get();
+                        releasedFinish.get();
                     } catch (Exception e) {
-                        // re-throw all exception to IOException caught by read scheduler.
+                        // re-throw all exception to IOException caught by file data manager.
                         throw new IOException(e);
                     }
                 });
@@ -302,13 +302,13 @@ class HsResultPartitionReadSchedulerTest {
                     @Override
                     public void go() throws Exception {
                         readBufferStart.get();
-                        readScheduler.release();
-                        schedulerReleasedFinish.complete(null);
+                        fileDataManager.release();
+                        releasedFinish.complete(null);
                     }
                 };
         releaseThread.start();
 
-        readScheduler.registerNewSubpartition(0, subpartitionViewOperation);
+        fileDataManager.registerNewSubpartition(0, subpartitionViewOperation);
 
         ioExecutor.trigger();
 
@@ -320,20 +320,20 @@ class HsResultPartitionReadSchedulerTest {
                 .hasMessageContaining("Result partition has been already released.");
     }
 
-    /** Test scheduler was released, but receive new subpartition reader registration. */
+    /** Test file data manager was released, but receive new subpartition reader registration. */
     @Test
-    void testRegisterSubpartitionReaderAfterSchedulerReleased() {
+    void testRegisterSubpartitionReaderAfterReleased() {
         TestingHsSubpartitionFileReader reader = new TestingHsSubpartitionFileReader();
         factory.allReaders.add(reader);
 
-        readScheduler.release();
+        fileDataManager.release();
         assertThatThrownBy(
                         () -> {
-                            readScheduler.registerNewSubpartition(0, subpartitionViewOperation);
+                            fileDataManager.registerNewSubpartition(0, subpartitionViewOperation);
                             ioExecutor.trigger();
                         })
                 .isInstanceOf(IllegalStateException.class)
-                .hasMessageContaining("HsResultPartitionReadScheduler is already released.");
+                .hasMessageContaining("HsFileDataManager is already released.");
     }
 
     private static FileChannel openFileChannel(Path path) throws IOException {


[flink] 08/13: [FLINK-27908] Add lifecycle method to HsFileDataManager, HsMemoryDataManager and HsMemoryDataSpiller.

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0d185b86bd4558183b9696c2eeda149d532a5a60
Author: Weijie Guo <re...@163.com>
AuthorDate: Thu Jul 28 18:27:20 2022 +0800

    [FLINK-27908] Add lifecycle method to HsFileDataManager, HsMemoryDataManager and HsMemoryDataSpiller.
---
 .../partition/hybrid/HsFileDataManager.java        | 21 +++++++----
 .../partition/hybrid/HsMemoryDataManager.java      | 22 ++++++++++--
 .../partition/hybrid/HsMemoryDataSpiller.java      | 37 ++++++++++++++++---
 .../partition/hybrid/HsMemoryDataManagerTest.java  | 30 +++++++++++-----
 .../partition/hybrid/HsMemoryDataSpillerTest.java  | 42 ++++++++++++++++------
 5 files changed, 118 insertions(+), 34 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
index dbcd204300b..f37a1ff5164 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
@@ -133,6 +133,11 @@ public class HsFileDataManager implements Runnable, BufferRecycler {
                 checkNotNull(hybridShuffleConfiguration.getBufferRequestTimeout());
     }
 
+    /** Setup read buffer pool. */
+    public void setup() {
+        bufferPool.initialize();
+    }
+
     @Override
     // Note, this method is synchronized on `this`, not `lock`. The purpose here is to prevent
     // concurrent `run()` executions. Concurrent calls to other methods are allowed.
@@ -163,14 +168,15 @@ public class HsFileDataManager implements Runnable, BufferRecycler {
         }
     }
 
-    /**
-     * Releases this file data manager and returns a {@link CompletableFuture} which will be
-     * completed when all resources are released.
-     */
-    public CompletableFuture<?> release() {
+    public void deleteShuffleFile() {
+        IOUtils.deleteFileQuietly(dataFilePath);
+    }
+
+    /** Releases this file data manager and delete shuffle data after all readers is removed. */
+    public void release() {
         synchronized (lock) {
             if (isReleased) {
-                return releaseFuture;
+                return;
             }
             isReleased = true;
 
@@ -179,7 +185,8 @@ public class HsFileDataManager implements Runnable, BufferRecycler {
             failSubpartitionReaders(
                     pendingReaders,
                     new IllegalStateException("Result partition has been already released."));
-            return releaseFuture;
+            // delete the shuffle file only when no reader is reading now.
+            releaseFuture.thenRun(this::deleteShuffleFile);
         }
     }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
index 0f0744fe9ab..7acc9d0bdbb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
@@ -28,7 +28,7 @@ import org.apache.flink.util.function.SupplierWithException;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Deque;
@@ -68,10 +68,11 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData
             BufferPool bufferPool,
             HsSpillingStrategy spillStrategy,
             HsFileDataIndex fileDataIndex,
-            FileChannel dataFileChannel) {
+            Path dataFilePath)
+            throws IOException {
         this.numSubpartitions = numSubpartitions;
         this.bufferPool = bufferPool;
-        this.spiller = new HsMemoryDataSpiller(dataFileChannel);
+        this.spiller = new HsMemoryDataSpiller(dataFilePath);
         this.spillStrategy = spillStrategy;
         this.fileDataIndex = fileDataIndex;
         this.subpartitionMemoryDataManagers = new HsSubpartitionMemoryDataManager[numSubpartitions];
@@ -107,6 +108,21 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData
         }
     }
 
+    /** Close this {@link HsMemoryDataManager}, it means no data can append to memory. */
+    public void close() {
+        Decision decision = callWithLock(() -> spillStrategy.onResultPartitionClosed(this));
+        handleDecision(Optional.of(decision));
+        spiller.close();
+    }
+
+    /**
+     * Release this {@link HsMemoryDataManager}, it means all memory taken by this class will
+     * recycle.
+     */
+    public void release() {
+        spiller.release();
+    }
+
     // ------------------------------------
     //        For Spilling Strategy
     // ------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataSpiller.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataSpiller.java
index be376b5e241..e2f4cb06aba 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataSpiller.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataSpiller.java
@@ -29,11 +29,15 @@ import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadF
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /**
  * This component is responsible for asynchronously writing in-memory data to disk. Each spilling
@@ -59,8 +63,10 @@ public class HsMemoryDataSpiller implements AutoCloseable {
     /** Records the current writing location. */
     private long totalBytesWritten;
 
-    public HsMemoryDataSpiller(FileChannel dataFileChannel) {
-        this.dataFileChannel = dataFileChannel;
+    public HsMemoryDataSpiller(Path dataFilePath) throws IOException {
+        this.dataFileChannel =
+                FileChannel.open(
+                        dataFilePath, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
     }
 
     /**
@@ -145,8 +151,31 @@ public class HsMemoryDataSpiller implements AutoCloseable {
         bufferWithHeaders[index + 1] = buffer.getNioBufferReadable();
     }
 
-    @Override
-    public void close() throws Exception {
+    /**
+     * Close this {@link HsMemoryDataSpiller} when resultPartition is closed. It means spiller will
+     * no longer accept new spilling operation.
+     *
+     * <p>This method only called by main task thread.
+     */
+    public void close() {
         ioExecutor.shutdown();
     }
+
+    /**
+     * Release this {@link HsMemoryDataSpiller} when resultPartition is released. It means spiller
+     * will wait for all previous spilling operation done blocking and close the file channel.
+     *
+     * <p>This method only called by rpc thread.
+     */
+    public void release() {
+        try {
+            ioExecutor.shutdown();
+            if (!ioExecutor.awaitTermination(5L, TimeUnit.MINUTES)) {
+                throw new TimeoutException("Shutdown spilling thread timeout.");
+            }
+            dataFileChannel.close();
+        } catch (Exception e) {
+            ExceptionUtils.rethrow(e);
+        }
+    }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerTest.java
index 6608be9c500..97d16aad840 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerTest.java
@@ -30,10 +30,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.file.Files;
 import java.nio.file.Path;
-import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -54,12 +51,11 @@ class HsMemoryDataManagerTest {
 
     private int bufferSize = Integer.BYTES;
 
-    private FileChannel dataFileChannel;
+    private Path dataFilePath;
 
     @BeforeEach
-    void before(@TempDir Path tempDir) throws Exception {
-        Path dataPath = Files.createFile(tempDir.resolve(".data"));
-        dataFileChannel = FileChannel.open(dataPath, StandardOpenOption.WRITE);
+    void before(@TempDir Path tempDir) {
+        this.dataFilePath = tempDir.resolve(".data");
     }
 
     @Test
@@ -179,6 +175,22 @@ class HsMemoryDataManagerTest {
         assertThat(globalDecisionFuture).isCompleted();
     }
 
+    @Test
+    void testResultPartitionClosed() throws Exception {
+        CompletableFuture<Void> resultPartitionReleaseFuture = new CompletableFuture<>();
+        HsSpillingStrategy spillingStrategy =
+                TestingSpillingStrategy.builder()
+                        .setOnResultPartitionClosedFunction(
+                                (ignore) -> {
+                                    resultPartitionReleaseFuture.complete(null);
+                                    return Decision.NO_ACTION;
+                                })
+                        .build();
+        HsMemoryDataManager memoryDataManager = createMemoryDataManager(spillingStrategy);
+        memoryDataManager.close();
+        assertThat(resultPartitionReleaseFuture).isCompleted();
+    }
+
     private HsMemoryDataManager createMemoryDataManager(HsSpillingStrategy spillStrategy)
             throws Exception {
         NetworkBufferPool networkBufferPool = new NetworkBufferPool(NUM_BUFFERS, bufferSize);
@@ -189,7 +201,7 @@ class HsMemoryDataManagerTest {
                 bufferPool,
                 spillStrategy,
                 new HsFileDataIndexImpl(NUM_SUBPARTITIONS),
-                dataFileChannel);
+                dataFilePath);
     }
 
     private HsMemoryDataManager createMemoryDataManager(
@@ -202,7 +214,7 @@ class HsMemoryDataManagerTest {
                 bufferPool,
                 spillStrategy,
                 fileDataIndex,
-                dataFileChannel);
+                dataFilePath);
     }
 
     private static ByteBuffer createRecord(int value) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataSpillerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataSpillerTest.java
index b31f522f799..e61465207a9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataSpillerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataSpillerTest.java
@@ -36,7 +36,6 @@ import org.junit.jupiter.api.io.TempDir;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.nio.channels.FileChannel;
-import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
@@ -44,9 +43,11 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link HsMemoryDataSpiller}. */
 @ExtendWith(TestLoggerExtension.class)
@@ -57,18 +58,15 @@ class HsMemoryDataSpillerTest {
     private static final long BUFFER_WITH_HEADER_SIZE =
             BUFFER_SIZE + BufferReaderWriterUtil.HEADER_LENGTH;
 
-    private FileChannel dataFileChannel;
+    private FileChannel readChannel;
 
     private HsMemoryDataSpiller memoryDataSpiller;
 
     @BeforeEach
     void before(@TempDir Path tempDir) throws Exception {
-        dataFileChannel =
-                FileChannel.open(
-                        Files.createFile(tempDir.resolve(".data")),
-                        StandardOpenOption.WRITE,
-                        StandardOpenOption.READ);
-        this.memoryDataSpiller = new HsMemoryDataSpiller(dataFileChannel);
+        Path dataFilePath = tempDir.resolve(".data");
+        this.memoryDataSpiller = new HsMemoryDataSpiller(dataFilePath);
+        this.readChannel = FileChannel.open(dataFilePath, StandardOpenOption.READ);
     }
 
     @Test
@@ -115,6 +113,30 @@ class HsMemoryDataSpillerTest {
                         Tuple2.of(6, 2)));
     }
 
+    @Test
+    void testClose() {
+        List<BufferWithIdentity> bufferWithIdentityList = new ArrayList<>();
+        bufferWithIdentityList.addAll(
+                createBufferWithIdentityList(
+                        0, Arrays.asList(Tuple2.of(0, 0), Tuple2.of(1, 1), Tuple2.of(2, 2))));
+        memoryDataSpiller.close();
+        assertThatThrownBy(() -> memoryDataSpiller.spillAsync(bufferWithIdentityList))
+                .isInstanceOf(RejectedExecutionException.class);
+    }
+
+    @Test
+    void testRelease() throws Exception {
+        List<BufferWithIdentity> bufferWithIdentityList =
+                new ArrayList<>(
+                        createBufferWithIdentityList(
+                                0,
+                                Arrays.asList(Tuple2.of(0, 0), Tuple2.of(1, 1), Tuple2.of(2, 2))));
+        memoryDataSpiller.spillAsync(bufferWithIdentityList);
+        // blocked until spill finished.
+        memoryDataSpiller.release();
+        checkData(Arrays.asList(Tuple2.of(0, 0), Tuple2.of(1, 1), Tuple2.of(2, 2)));
+    }
+
     /**
      * create buffer with identity list.
      *
@@ -158,14 +180,12 @@ class HsMemoryDataSpillerTest {
     }
 
     private void checkData(List<Tuple2<Integer, Integer>> dataAndIndexes) throws Exception {
-        // reset channel position for read.
-        dataFileChannel.position(0);
         ByteBuffer headerBuf = BufferReaderWriterUtil.allocatedHeaderBuffer();
         MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(BUFFER_SIZE);
         for (Tuple2<Integer, Integer> dataAndIndex : dataAndIndexes) {
             Buffer buffer =
                     BufferReaderWriterUtil.readFromByteChannel(
-                            dataFileChannel, headerBuf, segment, (ignore) -> {});
+                            readChannel, headerBuf, segment, (ignore) -> {});
 
             assertThat(buffer.readableBytes()).isEqualTo(BUFFER_SIZE);
             assertThat(buffer.getNioBufferReadable().order(ByteOrder.nativeOrder()).getInt())


[flink] 07/13: [FLINK-27908] Extends onResultPartitionClosed to HsSpillingStrategy.

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 828b3a58aca91e21b2fd328448b977d460e6e369
Author: Weijie Guo <re...@163.com>
AuthorDate: Thu Jul 28 18:31:48 2022 +0800

    [FLINK-27908] Extends onResultPartitionClosed to HsSpillingStrategy.
---
 .../partition/hybrid/HsFullSpillingStrategy.java   | 20 +++++++++++++
 .../hybrid/HsSelectiveSpillingStrategy.java        | 20 +++++++++++++
 .../partition/hybrid/HsSpillingStrategy.java       | 15 ++++++++++
 .../hybrid/HsFullSpillingStrategyTest.java         | 33 ++++++++++++++++++++++
 .../hybrid/HsSelectiveSpillingStrategyTest.java    | 32 +++++++++++++++++++++
 .../partition/hybrid/TestingSpillingStrategy.java  | 23 +++++++++++++--
 6 files changed, 141 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java
index bc2889627d8..cfc737d2efd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java
@@ -79,6 +79,26 @@ public class HsFullSpillingStrategy implements HsSpillingStrategy {
         return builder.build();
     }
 
+    @Override
+    public Decision onResultPartitionClosed(HsSpillingInfoProvider spillingInfoProvider) {
+        Decision.Builder builder = Decision.builder();
+        for (int subpartitionId = 0;
+                subpartitionId < spillingInfoProvider.getNumSubpartitions();
+                subpartitionId++) {
+            builder.addBufferToSpill(
+                            subpartitionId,
+                            // get all not start spilling buffers.
+                            spillingInfoProvider.getBuffersInOrder(
+                                    subpartitionId, SpillStatus.NOT_SPILL, ConsumeStatus.ALL))
+                    .addBufferToRelease(
+                            subpartitionId,
+                            // get all not released buffers.
+                            spillingInfoProvider.getBuffersInOrder(
+                                    subpartitionId, SpillStatus.ALL, ConsumeStatus.ALL));
+        }
+        return builder.build();
+    }
+
     private void checkSpill(HsSpillingInfoProvider spillingInfoProvider, Decision.Builder builder) {
         if (spillingInfoProvider.getNumTotalUnSpillBuffers() < numBuffersTriggerSpilling) {
             // In case situation changed since onBufferFinished() returns Optional#empty()
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java
index 3f173e50f2a..dcd53393bdf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java
@@ -102,4 +102,24 @@ public class HsSelectiveSpillingStrategy implements HsSpillingStrategy {
                 });
         return builder.build();
     }
+
+    @Override
+    public Decision onResultPartitionClosed(HsSpillingInfoProvider spillingInfoProvider) {
+        Decision.Builder builder = Decision.builder();
+        for (int subpartitionId = 0;
+                subpartitionId < spillingInfoProvider.getNumSubpartitions();
+                subpartitionId++) {
+            builder.addBufferToSpill(
+                            subpartitionId,
+                            // get all not start spilling buffers.
+                            spillingInfoProvider.getBuffersInOrder(
+                                    subpartitionId, SpillStatus.NOT_SPILL, ConsumeStatus.ALL))
+                    .addBufferToRelease(
+                            subpartitionId,
+                            // get all not released buffers.
+                            spillingInfoProvider.getBuffersInOrder(
+                                    subpartitionId, SpillStatus.ALL, ConsumeStatus.ALL));
+        }
+        return builder.build();
+    }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java
index 4525b8fd442..fb4a5ab58d5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java
@@ -71,6 +71,15 @@ public interface HsSpillingStrategy {
      */
     Decision decideActionWithGlobalInfo(HsSpillingInfoProvider spillingInfoProvider);
 
+    /**
+     * Make a decision when result partition is closed. Because this method will directly touch the
+     * {@link HsSpillingInfoProvider}, the caller should take care of the thread safety.
+     *
+     * @param spillingInfoProvider that provides information about the current status.
+     * @return A {@link Decision} based on the global information.
+     */
+    Decision onResultPartitionClosed(HsSpillingInfoProvider spillingInfoProvider);
+
     /**
      * This class represents the spill and release decision made by {@link HsSpillingStrategy}, in
      * other words, which data is to be spilled and which data is to be released.
@@ -143,6 +152,12 @@ public interface HsSpillingStrategy {
                 return this;
             }
 
+            public Builder addBufferToRelease(
+                    int subpartitionId, Deque<BufferIndexAndChannel> buffers) {
+                bufferToRelease.computeIfAbsent(subpartitionId, ArrayList::new).addAll(buffers);
+                return this;
+            }
+
             public Decision build() {
                 return new Decision(bufferToSpill, bufferToRelease);
             }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java
index b8d1289efb8..44701cbab76 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java
@@ -24,6 +24,7 @@ import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -182,4 +183,36 @@ class HsFullSpillingStrategyTest {
                 .extractingByKey(subpartitionId)
                 .satisfies((buffers) -> assertThat(buffers).hasSizeGreaterThan(numReleaseBuffer));
     }
+
+    @Test
+    void testOnResultPartitionClosed() {
+        final int subpartition1 = 0;
+        final int subpartition2 = 1;
+
+        List<BufferIndexAndChannel> subpartitionBuffer1 =
+                createBufferIndexAndChannelsList(subpartition1, 0, 1, 2, 3);
+        List<BufferIndexAndChannel> subpartitionBuffer2 =
+                createBufferIndexAndChannelsList(subpartition2, 0, 1, 2);
+        TestingSpillingInfoProvider spillInfoProvider =
+                TestingSpillingInfoProvider.builder()
+                        .setGetNumSubpartitionsSupplier(() -> 2)
+                        .addSubpartitionBuffers(subpartition1, subpartitionBuffer1)
+                        .addSubpartitionBuffers(subpartition2, subpartitionBuffer2)
+                        .addSpillBuffers(subpartition1, Arrays.asList(2, 3))
+                        .addConsumedBuffers(subpartition1, Collections.singletonList(0))
+                        .addSpillBuffers(subpartition2, Collections.singletonList(2))
+                        .build();
+
+        Decision decision = spillStrategy.onResultPartitionClosed(spillInfoProvider);
+
+        Map<Integer, List<BufferIndexAndChannel>> expectedToSpillBuffers = new HashMap<>();
+        expectedToSpillBuffers.put(subpartition1, subpartitionBuffer1.subList(0, 2));
+        expectedToSpillBuffers.put(subpartition2, subpartitionBuffer2.subList(0, 2));
+        assertThat(decision.getBufferToSpill()).isEqualTo(expectedToSpillBuffers);
+
+        Map<Integer, List<BufferIndexAndChannel>> expectedToReleaseBuffers = new HashMap<>();
+        expectedToReleaseBuffers.put(subpartition1, subpartitionBuffer1.subList(0, 4));
+        expectedToReleaseBuffers.put(subpartition2, subpartitionBuffer2.subList(0, 3));
+        assertThat(decision.getBufferToRelease()).isEqualTo(expectedToReleaseBuffers);
+    }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategyTest.java
index 6862c4c6e4a..dfeb7a0f425 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategyTest.java
@@ -133,4 +133,36 @@ class HsSelectiveSpillingStrategyTest {
         assertThat(globalDecision.getBufferToSpill()).isEqualTo(expectedBuffers);
         assertThat(globalDecision.getBufferToRelease()).isEqualTo(expectedBuffers);
     }
+
+    @Test
+    void testOnResultPartitionClosed() {
+        final int subpartition1 = 0;
+        final int subpartition2 = 1;
+
+        List<BufferIndexAndChannel> subpartitionBuffer1 =
+                createBufferIndexAndChannelsList(subpartition1, 0, 1, 2, 3);
+        List<BufferIndexAndChannel> subpartitionBuffer2 =
+                createBufferIndexAndChannelsList(subpartition2, 0, 1, 2);
+        TestingSpillingInfoProvider spillInfoProvider =
+                TestingSpillingInfoProvider.builder()
+                        .setGetNumSubpartitionsSupplier(() -> 2)
+                        .addSubpartitionBuffers(subpartition1, subpartitionBuffer1)
+                        .addSubpartitionBuffers(subpartition2, subpartitionBuffer2)
+                        .addSpillBuffers(subpartition1, Arrays.asList(2, 3))
+                        .addConsumedBuffers(subpartition1, Collections.singletonList(0))
+                        .addSpillBuffers(subpartition2, Collections.singletonList(2))
+                        .build();
+
+        Decision decision = spillStrategy.onResultPartitionClosed(spillInfoProvider);
+
+        Map<Integer, List<BufferIndexAndChannel>> expectedToSpillBuffers = new HashMap<>();
+        expectedToSpillBuffers.put(subpartition1, subpartitionBuffer1.subList(0, 2));
+        expectedToSpillBuffers.put(subpartition2, subpartitionBuffer2.subList(0, 2));
+        assertThat(decision.getBufferToSpill()).isEqualTo(expectedToSpillBuffers);
+
+        Map<Integer, List<BufferIndexAndChannel>> expectedToReleaseBuffers = new HashMap<>();
+        expectedToReleaseBuffers.put(subpartition1, subpartitionBuffer1.subList(0, 4));
+        expectedToReleaseBuffers.put(subpartition2, subpartitionBuffer2.subList(0, 3));
+        assertThat(decision.getBufferToRelease()).isEqualTo(expectedToReleaseBuffers);
+    }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingStrategy.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingStrategy.java
index 4cce53db0a9..61a730fc5c8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingStrategy.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingStrategy.java
@@ -32,15 +32,19 @@ public class TestingSpillingStrategy implements HsSpillingStrategy {
 
     private final Function<HsSpillingInfoProvider, Decision> decideActionWithGlobalInfoFunction;
 
+    private final Function<HsSpillingInfoProvider, Decision> onResultPartitionClosedFunction;
+
     private TestingSpillingStrategy(
             BiFunction<Integer, Integer, Optional<Decision>> onMemoryUsageChangedFunction,
             Function<Integer, Optional<Decision>> onBufferFinishedFunction,
             Function<BufferIndexAndChannel, Optional<Decision>> onBufferConsumedFunction,
-            Function<HsSpillingInfoProvider, Decision> decideActionWithGlobalInfoFunction) {
+            Function<HsSpillingInfoProvider, Decision> decideActionWithGlobalInfoFunction,
+            Function<HsSpillingInfoProvider, Decision> onResultPartitionClosedFunction) {
         this.onMemoryUsageChangedFunction = onMemoryUsageChangedFunction;
         this.onBufferFinishedFunction = onBufferFinishedFunction;
         this.onBufferConsumedFunction = onBufferConsumedFunction;
         this.decideActionWithGlobalInfoFunction = decideActionWithGlobalInfoFunction;
+        this.onResultPartitionClosedFunction = onResultPartitionClosedFunction;
     }
 
     @Override
@@ -64,6 +68,11 @@ public class TestingSpillingStrategy implements HsSpillingStrategy {
         return decideActionWithGlobalInfoFunction.apply(spillingInfoProvider);
     }
 
+    @Override
+    public Decision onResultPartitionClosed(HsSpillingInfoProvider spillingInfoProvider) {
+        return onResultPartitionClosedFunction.apply(spillingInfoProvider);
+    }
+
     public static Builder builder() {
         return new Builder();
     }
@@ -82,6 +91,9 @@ public class TestingSpillingStrategy implements HsSpillingStrategy {
         private Function<HsSpillingInfoProvider, Decision> decideActionWithGlobalInfoFunction =
                 (ignore) -> Decision.NO_ACTION;
 
+        private Function<HsSpillingInfoProvider, Decision> onResultPartitionClosedFunction =
+                (ignore) -> Decision.NO_ACTION;
+
         private Builder() {}
 
         public Builder setOnMemoryUsageChangedFunction(
@@ -108,12 +120,19 @@ public class TestingSpillingStrategy implements HsSpillingStrategy {
             return this;
         }
 
+        public Builder setOnResultPartitionClosedFunction(
+                Function<HsSpillingInfoProvider, Decision> onResultPartitionClosedFunction) {
+            this.onResultPartitionClosedFunction = onResultPartitionClosedFunction;
+            return this;
+        }
+
         public TestingSpillingStrategy build() {
             return new TestingSpillingStrategy(
                     onMemoryUsageChangedFunction,
                     onBufferFinishedFunction,
                     onBufferConsumedFunction,
-                    decideActionWithGlobalInfoFunction);
+                    decideActionWithGlobalInfoFunction,
+                    onResultPartitionClosedFunction);
         }
     }
 }


[flink] 12/13: [FLINK-27908] Introduce HsResultPartition

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fed2962d3a25179dd0fac508cf3eb29ed7b533e1
Author: Weijie Guo <re...@163.com>
AuthorDate: Thu Jul 28 22:34:06 2022 +0800

    [FLINK-27908] Introduce HsResultPartition
---
 .../partition/hybrid/HsResultPartition.java        | 266 ++++++++++++
 .../partition/hybrid/HsResultPartitionTest.java    | 476 +++++++++++++++++++++
 2 files changed, 742 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
new file mode 100644
index 00000000000..100df9f7cff
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
+import org.apache.flink.runtime.io.network.api.EndOfData;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.StopMode;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+import org.apache.flink.util.function.SupplierWithException;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * {@link HsResultPartition} appends records and events to {@link HsMemoryDataManager}, the shuffle
+ * data maybe spilled to disk according to the {@link HsSpillingStrategy}, and the downstream can
+ * consume data from memory or disk.
+ */
+public class HsResultPartition extends ResultPartition {
+    public static final String DATA_FILE_SUFFIX = ".hybrid.data";
+
+    private final HsFileDataIndex dataIndex;
+
+    private final HsFileDataManager fileDataManager;
+
+    private final Path dataFilePath;
+
+    private final int networkBufferSize;
+
+    private final HybridShuffleConfiguration hybridShuffleConfiguration;
+
+    private boolean hasNotifiedEndOfUserRecords;
+
+    @Nullable private HsMemoryDataManager memoryDataManager;
+
+    public HsResultPartition(
+            String owningTaskName,
+            int partitionIndex,
+            ResultPartitionID partitionId,
+            ResultPartitionType partitionType,
+            int numSubpartitions,
+            int numTargetKeyGroups,
+            BatchShuffleReadBufferPool readBufferPool,
+            Executor readIOExecutor,
+            ResultPartitionManager partitionManager,
+            String dataFileBashPath,
+            int networkBufferSize,
+            HybridShuffleConfiguration hybridShuffleConfiguration,
+            @Nullable BufferCompressor bufferCompressor,
+            SupplierWithException<BufferPool, IOException> bufferPoolFactory) {
+        super(
+                owningTaskName,
+                partitionIndex,
+                partitionId,
+                partitionType,
+                numSubpartitions,
+                numTargetKeyGroups,
+                partitionManager,
+                bufferCompressor,
+                bufferPoolFactory);
+        this.networkBufferSize = networkBufferSize;
+        this.dataIndex = new HsFileDataIndexImpl(numSubpartitions);
+        this.dataFilePath = new File(dataFileBashPath + DATA_FILE_SUFFIX).toPath();
+        this.hybridShuffleConfiguration = hybridShuffleConfiguration;
+        this.fileDataManager =
+                new HsFileDataManager(
+                        readBufferPool,
+                        readIOExecutor,
+                        dataIndex,
+                        dataFilePath,
+                        HsSubpartitionFileReaderImpl.Factory.INSTANCE,
+                        hybridShuffleConfiguration);
+    }
+
+    // Called by task thread.
+    @Override
+    protected void setupInternal() throws IOException {
+        if (isReleased()) {
+            throw new IOException("Result partition has been released.");
+        }
+        this.fileDataManager.setup();
+        this.memoryDataManager =
+                new HsMemoryDataManager(
+                        numSubpartitions,
+                        networkBufferSize,
+                        bufferPool,
+                        getSpillingStrategy(hybridShuffleConfiguration),
+                        dataIndex,
+                        dataFilePath);
+    }
+
+    @Override
+    public void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException {
+        emit(record, targetSubpartition, Buffer.DataType.DATA_BUFFER);
+    }
+
+    @Override
+    public void broadcastRecord(ByteBuffer record) throws IOException {
+        broadcast(record, Buffer.DataType.DATA_BUFFER);
+    }
+
+    @Override
+    public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException {
+        Buffer buffer = EventSerializer.toBuffer(event, isPriorityEvent);
+        try {
+            ByteBuffer serializedEvent = buffer.getNioBufferReadable();
+            broadcast(serializedEvent, buffer.getDataType());
+        } finally {
+            buffer.recycleBuffer();
+        }
+    }
+
+    private void broadcast(ByteBuffer record, Buffer.DataType dataType) throws IOException {
+        for (int i = 0; i < numSubpartitions; i++) {
+            emit(record.duplicate(), i, dataType);
+        }
+    }
+
+    private void emit(ByteBuffer record, int targetSubpartition, Buffer.DataType dataType)
+            throws IOException {
+        checkInProduceState();
+        checkNotNull(memoryDataManager).append(record, targetSubpartition, dataType);
+    }
+
+    @Override
+    public ResultSubpartitionView createSubpartitionView(
+            int subpartitionId, BufferAvailabilityListener availabilityListener)
+            throws IOException {
+        checkState(!isReleased(), "ResultPartition already released.");
+        HsSubpartitionView subpartitionView = new HsSubpartitionView(availabilityListener);
+        HsDataView diskDataView =
+                fileDataManager.registerNewSubpartition(subpartitionId, subpartitionView);
+
+        HsDataView memoryDataView =
+                checkNotNull(memoryDataManager)
+                        .registerSubpartitionView(subpartitionId, subpartitionView);
+
+        subpartitionView.setDiskDataView(diskDataView);
+        subpartitionView.setMemoryDataView(memoryDataView);
+        return subpartitionView;
+    }
+
+    @Override
+    public void alignedBarrierTimeout(long checkpointId) throws IOException {
+        // Nothing to do.
+    }
+
+    @Override
+    public void abortCheckpoint(long checkpointId, CheckpointException cause) {
+        // Nothing to do.
+    }
+
+    @Override
+    public void flushAll() {
+        // Nothing to do.
+    }
+
+    @Override
+    public void flush(int subpartitionIndex) {
+        // Nothing to do.
+    }
+
+    @Override
+    public void finish() throws IOException {
+        broadcastEvent(EndOfPartitionEvent.INSTANCE, false);
+
+        checkState(!isReleased(), "Result partition is already released.");
+
+        super.finish();
+    }
+
+    @Override
+    public void close() {
+        // close is called when task is finished or failed.
+        checkNotNull(memoryDataManager).close();
+        super.close();
+    }
+
+    @Override
+    protected void releaseInternal() {
+        // release is called when release by scheduler, later than close.
+        // mainly work :
+        // 1. release read scheduler.
+        // 2. delete shuffle file.
+        // 3. release all data in memory.
+
+        fileDataManager.release();
+
+        checkNotNull(memoryDataManager).release();
+    }
+
+    @Override
+    public int getNumberOfQueuedBuffers() {
+        // Batch shuffle does not need to provide QueuedBuffers information
+        return 0;
+    }
+
+    @Override
+    public long getSizeOfQueuedBuffersUnsafe() {
+        // Batch shuffle does not need to provide QueuedBuffers information
+        return 0;
+    }
+
+    @Override
+    public int getNumberOfQueuedBuffers(int targetSubpartition) {
+        // Batch shuffle does not need to provide QueuedBuffers information
+        return 0;
+    }
+
+    @Override
+    public void notifyEndOfData(StopMode mode) throws IOException {
+        if (!hasNotifiedEndOfUserRecords) {
+            broadcastEvent(new EndOfData(mode), false);
+            hasNotifiedEndOfUserRecords = true;
+        }
+    }
+
+    private HsSpillingStrategy getSpillingStrategy(
+            HybridShuffleConfiguration hybridShuffleConfiguration) {
+        switch (hybridShuffleConfiguration.getSpillingStrategyType()) {
+            case FULL:
+                return new HsFullSpillingStrategy(hybridShuffleConfiguration);
+            case SELECTIVE:
+                return new HsSelectiveSpillingStrategy(hybridShuffleConfiguration);
+            default:
+                throw new IllegalConfigurationException("Illegal spilling strategy.");
+        }
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java
new file mode 100644
index 00000000000..6c7d7b66b34
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
+import org.apache.flink.runtime.io.disk.FileChannelManager;
+import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+import org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleConfiguration.SpillingStrategyType;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Queue;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link HsResultPartition}. */
+class HsResultPartitionTest {
+
+    private static final int bufferSize = 1024;
+
+    private static final int totalBuffers = 1000;
+
+    private static final int totalBytes = 32 * 1024 * 1024;
+
+    private static final int numThreads = 4;
+
+    private FileChannelManager fileChannelManager;
+
+    private NetworkBufferPool globalPool;
+
+    private BatchShuffleReadBufferPool readBufferPool;
+
+    private ExecutorService readIOExecutor;
+
+    @TempDir public Path tempDataPath;
+
+    @BeforeEach
+    void before() {
+        fileChannelManager =
+                new FileChannelManagerImpl(new String[] {tempDataPath.toString()}, "testing");
+        globalPool = new NetworkBufferPool(totalBuffers, bufferSize);
+        readBufferPool = new BatchShuffleReadBufferPool(totalBytes, bufferSize);
+        readIOExecutor = Executors.newFixedThreadPool(numThreads);
+    }
+
+    @AfterEach
+    void after() throws Exception {
+        fileChannelManager.close();
+        globalPool.destroy();
+        readBufferPool.destroy();
+        readIOExecutor.shutdown();
+    }
+
+    @Test
+    void testEmit() throws Exception {
+        int numBuffers = 100;
+        int numSubpartitions = 10;
+        int numRecords = 1000;
+        Random random = new Random();
+
+        BufferPool bufferPool = globalPool.createBufferPool(numBuffers, numBuffers);
+
+        try (HsResultPartition partition = createHsResultPartition(numSubpartitions, bufferPool)) {
+            Queue<Tuple2<ByteBuffer, Buffer.DataType>>[] dataWritten = new Queue[numSubpartitions];
+            Queue<Buffer>[] buffersRead = new Queue[numSubpartitions];
+            for (int i = 0; i < numSubpartitions; ++i) {
+                dataWritten[i] = new ArrayDeque<>();
+                buffersRead[i] = new ArrayDeque<>();
+            }
+
+            int[] numBytesWritten = new int[numSubpartitions];
+            int[] numBytesRead = new int[numSubpartitions];
+            Arrays.fill(numBytesWritten, 0);
+            Arrays.fill(numBytesRead, 0);
+
+            for (int i = 0; i < numRecords; ++i) {
+                ByteBuffer record = generateRandomData(random.nextInt(2 * bufferSize) + 1, random);
+                boolean isBroadCast = random.nextBoolean();
+
+                if (isBroadCast) {
+                    partition.broadcastRecord(record);
+                    for (int subpartition = 0; subpartition < numSubpartitions; ++subpartition) {
+                        recordDataWritten(
+                                record,
+                                dataWritten,
+                                subpartition,
+                                numBytesWritten,
+                                Buffer.DataType.DATA_BUFFER);
+                    }
+                } else {
+                    int subpartition = random.nextInt(numSubpartitions);
+                    partition.emitRecord(record, subpartition);
+                    recordDataWritten(
+                            record,
+                            dataWritten,
+                            subpartition,
+                            numBytesWritten,
+                            Buffer.DataType.DATA_BUFFER);
+                }
+            }
+
+            partition.finish();
+
+            for (int subpartition = 0; subpartition < numSubpartitions; ++subpartition) {
+                ByteBuffer record = EventSerializer.toSerializedEvent(EndOfPartitionEvent.INSTANCE);
+                recordDataWritten(
+                        record,
+                        dataWritten,
+                        subpartition,
+                        numBytesWritten,
+                        Buffer.DataType.EVENT_BUFFER);
+            }
+
+            Tuple2<ResultSubpartitionView, TestingBufferAvailabilityListener>[] viewAndListeners =
+                    createSubpartitionViews(partition, numSubpartitions);
+            readData(
+                    viewAndListeners,
+                    (buffer, subpartitionId) -> {
+                        int numBytes = buffer.readableBytes();
+                        numBytesRead[subpartitionId] += numBytes;
+
+                        MemorySegment segment =
+                                MemorySegmentFactory.allocateUnpooledSegment(numBytes);
+                        segment.put(0, buffer.getNioBufferReadable(), numBytes);
+                        buffersRead[subpartitionId].add(
+                                new NetworkBuffer(
+                                        segment, (buf) -> {}, buffer.getDataType(), numBytes));
+                    });
+            checkWriteReadResult(
+                    numSubpartitions, numBytesWritten, numBytesRead, dataWritten, buffersRead);
+        }
+    }
+
+    @Test
+    void testBroadcastEvent() throws Exception {
+        final int numBuffers = 1;
+        BufferPool bufferPool = globalPool.createBufferPool(numBuffers, numBuffers);
+        try (HsResultPartition resultPartition = createHsResultPartition(2, bufferPool)) {
+            resultPartition.broadcastEvent(EndOfPartitionEvent.INSTANCE, false);
+            // broadcast event does not request buffer
+            assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isEqualTo(1);
+
+            Tuple2[] viewAndListeners = createSubpartitionViews(resultPartition, 2);
+
+            boolean[] receivedEvent = new boolean[2];
+            readData(
+                    viewAndListeners,
+                    (buffer, subpartition) -> {
+                        assertThat(buffer.getDataType().isEvent()).isTrue();
+                        try {
+                            AbstractEvent event =
+                                    EventSerializer.fromSerializedEvent(
+                                            buffer.readOnlySlice().getNioBufferReadable(),
+                                            HsResultPartitionTest.class.getClassLoader());
+                            assertThat(event).isInstanceOf(EndOfPartitionEvent.class);
+                            receivedEvent[subpartition] = true;
+                        } catch (IOException e) {
+                            throw new RuntimeException(e);
+                        }
+                    });
+
+            assertThat(receivedEvent).containsExactly(true, true);
+        }
+    }
+
+    @Test
+    void testClose() throws Exception {
+        final int numBuffers = 1;
+
+        BufferPool bufferPool = globalPool.createBufferPool(numBuffers, numBuffers);
+        HsResultPartition partition = createHsResultPartition(1, bufferPool);
+
+        partition.close();
+        // emit data to closed partition will throw exception.
+        assertThatThrownBy(() -> partition.emitRecord(ByteBuffer.allocate(bufferSize), 0));
+    }
+
+    @Test
+    @Timeout(30)
+    void testRelease() throws Exception {
+        final int numBuffers = 10;
+
+        BufferPool bufferPool = globalPool.createBufferPool(numBuffers, numBuffers);
+        HsResultPartition partition = createHsResultPartition(2, bufferPool);
+
+        partition.emitRecord(ByteBuffer.allocate(bufferSize * numBuffers), 1);
+        assertThat(bufferPool.bestEffortGetNumOfUsedBuffers()).isEqualTo(numBuffers);
+
+        partition.close();
+        assertThat(bufferPool.isDestroyed()).isTrue();
+
+        partition.release();
+
+        while (checkNotNull(fileChannelManager.getPaths()[0].listFiles()).length != 0) {
+            Thread.sleep(10);
+        }
+
+        assertThat(totalBuffers).isEqualTo(globalPool.getNumberOfAvailableMemorySegments());
+    }
+
+    @Test
+    void testCreateSubpartitionViewAfterRelease() throws Exception {
+        final int numBuffers = 10;
+        BufferPool bufferPool = globalPool.createBufferPool(numBuffers, numBuffers);
+        HsResultPartition resultPartition = createHsResultPartition(2, bufferPool);
+        resultPartition.release();
+        assertThatThrownBy(
+                        () ->
+                                resultPartition.createSubpartitionView(
+                                        0, new NoOpBufferAvailablityListener()))
+                .isInstanceOf(IllegalStateException.class);
+    }
+
+    @Test
+    void testAvailability() throws Exception {
+        final int numBuffers = 2;
+
+        BufferPool bufferPool = globalPool.createBufferPool(numBuffers, numBuffers);
+        HsResultPartition partition = createHsResultPartition(1, bufferPool);
+
+        partition.emitRecord(ByteBuffer.allocate(bufferSize * numBuffers), 0);
+        assertThat(partition.isAvailable()).isFalse();
+
+        // release partition to recycle buffer.
+        partition.close();
+        partition.release();
+
+        assertThat(partition.isAvailable()).isTrue();
+    }
+
+    private static void recordDataWritten(
+            ByteBuffer record,
+            Queue<Tuple2<ByteBuffer, Buffer.DataType>>[] dataWritten,
+            int subpartition,
+            int[] numBytesWritten,
+            Buffer.DataType dataType) {
+        record.rewind();
+        dataWritten[subpartition].add(Tuple2.of(record, dataType));
+        numBytesWritten[subpartition] += record.remaining();
+    }
+
+    private long readData(
+            Tuple2<ResultSubpartitionView, TestingBufferAvailabilityListener>[] viewAndListeners,
+            BiConsumer<Buffer, Integer> bufferProcessor)
+            throws Exception {
+        AtomicInteger dataSize = new AtomicInteger(0);
+        AtomicInteger numEndOfPartitionEvents = new AtomicInteger(0);
+        CheckedThread[] subpartitionViewThreads = new CheckedThread[viewAndListeners.length];
+        for (int i = 0; i < viewAndListeners.length; i++) {
+            // start thread for each view.
+            final int subpartition = i;
+            CheckedThread subpartitionViewThread =
+                    new CheckedThread() {
+                        @Override
+                        public void go() throws Exception {
+                            ResultSubpartitionView view = viewAndListeners[subpartition].f0;
+                            while (true) {
+                                ResultSubpartition.BufferAndBacklog bufferAndBacklog =
+                                        view.getNextBuffer();
+                                if (bufferAndBacklog == null) {
+                                    viewAndListeners[subpartition].f1.waitForData();
+                                    continue;
+                                }
+                                Buffer buffer = bufferAndBacklog.buffer();
+                                bufferProcessor.accept(buffer, subpartition);
+                                dataSize.addAndGet(buffer.readableBytes());
+                                buffer.recycleBuffer();
+
+                                if (!buffer.isBuffer()) {
+                                    numEndOfPartitionEvents.incrementAndGet();
+                                    view.releaseAllResources();
+                                    break;
+                                }
+                                if (bufferAndBacklog.getNextDataType() == Buffer.DataType.NONE) {
+                                    viewAndListeners[subpartition].f1.waitForData();
+                                }
+                            }
+                        }
+                    };
+            subpartitionViewThreads[subpartition] = subpartitionViewThread;
+            subpartitionViewThread.start();
+        }
+        for (CheckedThread thread : subpartitionViewThreads) {
+            thread.sync();
+        }
+        return dataSize.get();
+    }
+
+    private static ByteBuffer generateRandomData(int dataSize, Random random) {
+        byte[] dataWritten = new byte[dataSize];
+        random.nextBytes(dataWritten);
+        return ByteBuffer.wrap(dataWritten);
+    }
+
+    private HsResultPartition createHsResultPartition(
+            int numSubpartitions, BufferPool bufferPool, int numBuffersTriggerSpilling)
+            throws IOException {
+        HsResultPartition hsResultPartition =
+                new HsResultPartition(
+                        "HsResultPartitionTest",
+                        0,
+                        new ResultPartitionID(),
+                        ResultPartitionType.HYBRID,
+                        numSubpartitions,
+                        numSubpartitions,
+                        readBufferPool,
+                        readIOExecutor,
+                        new ResultPartitionManager(),
+                        fileChannelManager.createChannel().getPath(),
+                        bufferSize,
+                        HybridShuffleConfiguration.builder(
+                                        numSubpartitions, readBufferPool.getNumBuffersPerRequest())
+                                .setSpillingStrategyType(SpillingStrategyType.FULL)
+                                .setFullStrategyNumBuffersTriggerSpilling(numBuffersTriggerSpilling)
+                                .build(),
+                        null,
+                        () -> bufferPool);
+        hsResultPartition.setup();
+        return hsResultPartition;
+    }
+
+    private HsResultPartition createHsResultPartition(int numSubpartitions, BufferPool bufferPool)
+            throws IOException {
+        HsResultPartition hsResultPartition =
+                new HsResultPartition(
+                        "HsResultPartitionTest",
+                        0,
+                        new ResultPartitionID(),
+                        ResultPartitionType.HYBRID,
+                        numSubpartitions,
+                        numSubpartitions,
+                        readBufferPool,
+                        readIOExecutor,
+                        new ResultPartitionManager(),
+                        fileChannelManager.createChannel().getPath(),
+                        bufferSize,
+                        HybridShuffleConfiguration.builder(
+                                        numSubpartitions, readBufferPool.getNumBuffersPerRequest())
+                                .build(),
+                        null,
+                        () -> bufferPool);
+        hsResultPartition.setup();
+        return hsResultPartition;
+    }
+
+    private static void checkWriteReadResult(
+            int numSubpartitions,
+            int[] numBytesWritten,
+            int[] numBytesRead,
+            Queue<Tuple2<ByteBuffer, Buffer.DataType>>[] dataWritten,
+            Queue<Buffer>[] buffersRead) {
+        for (int subpartitionIndex = 0; subpartitionIndex < numSubpartitions; ++subpartitionIndex) {
+            assertThat(numBytesWritten[subpartitionIndex])
+                    .isEqualTo(numBytesRead[subpartitionIndex]);
+
+            List<Tuple2<ByteBuffer, Buffer.DataType>> eventsWritten = new ArrayList<>();
+            List<Buffer> eventsRead = new ArrayList<>();
+
+            ByteBuffer subpartitionDataWritten =
+                    ByteBuffer.allocate(numBytesWritten[subpartitionIndex]);
+            for (Tuple2<ByteBuffer, Buffer.DataType> bufferDataTypeTuple :
+                    dataWritten[subpartitionIndex]) {
+                subpartitionDataWritten.put(bufferDataTypeTuple.f0);
+                bufferDataTypeTuple.f0.rewind();
+                if (bufferDataTypeTuple.f1.isEvent()) {
+                    eventsWritten.add(bufferDataTypeTuple);
+                }
+            }
+
+            ByteBuffer subpartitionDataRead = ByteBuffer.allocate(numBytesRead[subpartitionIndex]);
+            for (Buffer buffer : buffersRead[subpartitionIndex]) {
+                subpartitionDataRead.put(buffer.getNioBufferReadable());
+                if (!buffer.isBuffer()) {
+                    eventsRead.add(buffer);
+                }
+            }
+
+            subpartitionDataWritten.flip();
+            subpartitionDataRead.flip();
+            assertThat(subpartitionDataWritten).isEqualTo(subpartitionDataRead);
+
+            assertThat(eventsWritten.size()).isEqualTo(eventsRead.size());
+            for (int i = 0; i < eventsWritten.size(); i++) {
+                assertThat(eventsWritten.get(i).f1).isEqualTo(eventsRead.get(i).getDataType());
+                assertThat(eventsWritten.get(i).f0)
+                        .isEqualTo(eventsRead.get(i).getNioBufferReadable());
+            }
+        }
+    }
+
+    private Tuple2<ResultSubpartitionView, TestingBufferAvailabilityListener>[]
+            createSubpartitionViews(HsResultPartition partition, int numSubpartitions)
+                    throws Exception {
+        Tuple2<ResultSubpartitionView, TestingBufferAvailabilityListener>[] viewAndListeners =
+                new Tuple2[numSubpartitions];
+        for (int subpartition = 0; subpartition < numSubpartitions; ++subpartition) {
+            TestingBufferAvailabilityListener listener = new TestingBufferAvailabilityListener();
+            viewAndListeners[subpartition] =
+                    Tuple2.of(partition.createSubpartitionView(subpartition, listener), listener);
+        }
+        return viewAndListeners;
+    }
+
+    private static final class TestingBufferAvailabilityListener
+            implements BufferAvailabilityListener {
+
+        private int numNotifications;
+
+        @Override
+        public synchronized void notifyDataAvailable() {
+            if (numNotifications == 0) {
+                notifyAll();
+            }
+            ++numNotifications;
+        }
+
+        public synchronized void waitForData() throws InterruptedException {
+            if (numNotifications == 0) {
+                wait();
+            }
+            numNotifications = 0;
+        }
+    }
+}


[flink] 05/13: [FLINK-27908] HsBufferContext ignore repeatedly startSpilling and release instead of checkState.

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9331e7502e9e8bda10f149e03280e26d5344e471
Author: Weijie Guo <re...@163.com>
AuthorDate: Mon Jul 25 13:40:59 2022 +0800

    [FLINK-27908] HsBufferContext ignore repeatedly startSpilling and release instead of checkState.
---
 .../network/partition/hybrid/HsBufferContext.java  | 22 +++++++++++----
 .../hybrid/HsSubpartitionMemoryDataManager.java    | 33 ++++++++++++++--------
 .../partition/hybrid/HsBufferContextTest.java      | 17 +++++------
 3 files changed, 44 insertions(+), 28 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsBufferContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsBufferContext.java
index 8feb6fd0c41..4a5f5fd9651 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsBufferContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsBufferContext.java
@@ -97,24 +97,34 @@ public class HsBufferContext {
         return Optional.ofNullable(spilledFuture);
     }
 
+    /** Mark buffer status to release. */
     public void release() {
-        checkState(!released, "Release buffer repeatedly is unexpected.");
+        if (isReleased()) {
+            return;
+        }
         released = true;
         // decrease ref count when buffer is released from memory.
         buffer.recycleBuffer();
     }
 
-    public void startSpilling(CompletableFuture<Void> spilledFuture) {
-        checkState(!released, "Buffer is already released.");
-        checkState(
-                !spillStarted && this.spilledFuture == null,
-                "Spill buffer repeatedly is unexpected.");
+    /**
+     * Mark buffer status to startSpilling.
+     *
+     * @param spilledFuture completable future of this buffer's spilling operation.
+     * @return false, if spilling of the buffer has been started before or the buffer has been
+     *     released already; true, otherwise.
+     */
+    public boolean startSpilling(CompletableFuture<Void> spilledFuture) {
+        if (isReleased() || isSpillStarted()) {
+            return false;
+        }
         spillStarted = true;
         this.spilledFuture = spilledFuture;
         // increase ref count when buffer is decided to spill.
         buffer.retainBuffer();
         // decrease ref count when buffer spilling is finished.
         spilledFuture.thenRun(buffer::recycleBuffer);
+        return true;
     }
 
     public void consumed() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java
index 56814024911..8084a8883f3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java
@@ -214,14 +214,16 @@ public class HsSubpartitionMemoryDataManager {
                                 .map(
                                         indexAndChannel -> {
                                             int bufferIndex = indexAndChannel.getBufferIndex();
-                                            HsBufferContext bufferContext =
-                                                    startSpillingBuffer(
-                                                            bufferIndex, spillDoneFuture);
-                                            return new BufferWithIdentity(
-                                                    bufferContext.getBuffer(),
-                                                    bufferIndex,
-                                                    targetChannel);
+                                            return startSpillingBuffer(bufferIndex, spillDoneFuture)
+                                                    .map(
+                                                            (context) ->
+                                                                    new BufferWithIdentity(
+                                                                            context.getBuffer(),
+                                                                            bufferIndex,
+                                                                            targetChannel));
                                         })
+                                .filter(Optional::isPresent)
+                                .map(Optional::get)
                                 .collect(Collectors.toList()));
     }
 
@@ -385,18 +387,25 @@ public class HsSubpartitionMemoryDataManager {
 
     @GuardedBy("subpartitionLock")
     private void releaseBuffer(int bufferIndex) {
-        HsBufferContext bufferContext = checkNotNull(bufferIndexToContexts.remove(bufferIndex));
+        HsBufferContext bufferContext = bufferIndexToContexts.remove(bufferIndex);
+        if (bufferContext == null) {
+            return;
+        }
         bufferContext.release();
         // remove released buffers from head lazy.
         trimHeadingReleasedBuffers(allBuffers);
     }
 
     @GuardedBy("subpartitionLock")
-    private HsBufferContext startSpillingBuffer(
+    private Optional<HsBufferContext> startSpillingBuffer(
             int bufferIndex, CompletableFuture<Void> spillFuture) {
-        HsBufferContext bufferContext = checkNotNull(bufferIndexToContexts.get(bufferIndex));
-        bufferContext.startSpilling(spillFuture);
-        return bufferContext;
+        HsBufferContext bufferContext = bufferIndexToContexts.get(bufferIndex);
+        if (bufferContext == null) {
+            return Optional.empty();
+        }
+        return bufferContext.startSpilling(spillFuture)
+                ? Optional.of(bufferContext)
+                : Optional.empty();
     }
 
     @GuardedBy("subpartitionLock")
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsBufferContextTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsBufferContextTest.java
index a16b811b68f..960357d4127 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsBufferContextTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsBufferContextTest.java
@@ -27,6 +27,7 @@ import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.createBuffer;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNoException;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link HsBufferContext}. */
@@ -57,10 +58,8 @@ class HsBufferContextTest {
 
     @Test
     void testBufferStartSpillingRepeatedly() {
-        bufferContext.startSpilling(new CompletableFuture<>());
-        assertThatThrownBy(() -> bufferContext.startSpilling(new CompletableFuture<>()))
-                .isInstanceOf(IllegalStateException.class)
-                .hasMessageContaining("Spill buffer repeatedly is unexpected.");
+        assertThat(bufferContext.startSpilling(new CompletableFuture<>())).isTrue();
+        assertThat(bufferContext.startSpilling(new CompletableFuture<>())).isFalse();
     }
 
     @Test
@@ -75,9 +74,9 @@ class HsBufferContextTest {
     @Test
     void testBufferReleaseRepeatedly() {
         bufferContext.release();
-        assertThatThrownBy(() -> bufferContext.release())
-                .isInstanceOf(IllegalStateException.class)
-                .hasMessageContaining("Release buffer repeatedly is unexpected.");
+        assertThatNoException()
+                .as("repeatedly release should only recycle buffer once.")
+                .isThrownBy(() -> bufferContext.release());
     }
 
     @Test
@@ -99,9 +98,7 @@ class HsBufferContextTest {
     @Test
     void testBufferStartSpillOrConsumedAfterReleased() {
         bufferContext.release();
-        assertThatThrownBy(() -> bufferContext.startSpilling(new CompletableFuture<>()))
-                .isInstanceOf(IllegalStateException.class)
-                .hasMessageContaining("Buffer is already released.");
+        assertThat(bufferContext.startSpilling(new CompletableFuture<>())).isFalse();
         assertThatThrownBy(() -> bufferContext.consumed())
                 .isInstanceOf(IllegalStateException.class)
                 .hasMessageContaining("Buffer is already released.");


[flink] 13/13: [FLINK-27908] ResultPartitionFactory also supports HYBRID type.

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4a2f3a15903ca365c14368b34b30a6234a51aa5e
Author: Weijie Guo <re...@163.com>
AuthorDate: Thu Jul 28 13:59:55 2022 +0800

    [FLINK-27908] ResultPartitionFactory also supports HYBRID type.
    
    This closes #20371
---
 .../network/partition/ResultPartitionFactory.java  | 22 ++++++++++++++++++++++
 .../io/network/partition/ResultPartitionType.java  |  2 +-
 .../partition/ResultPartitionFactoryTest.java      | 16 ++++++++++++++++
 3 files changed, 39 insertions(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
index e0c6bba9a05..6d47bb3c047 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
@@ -26,6 +26,8 @@ import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
 import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferPoolFactory;
+import org.apache.flink.runtime.io.network.partition.hybrid.HsResultPartition;
+import org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleConfiguration;
 import org.apache.flink.runtime.shuffle.NettyShuffleUtils;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
@@ -213,6 +215,26 @@ public class ResultPartitionFactory {
 
                 partition = blockingPartition;
             }
+        } else if (type == ResultPartitionType.HYBRID) {
+            partition =
+                    new HsResultPartition(
+                            taskNameWithSubtaskAndId,
+                            partitionIndex,
+                            id,
+                            type,
+                            subpartitions.length,
+                            maxParallelism,
+                            batchShuffleReadBufferPool,
+                            batchShuffleReadIOExecutor,
+                            partitionManager,
+                            channelManager.createChannel().getPath(),
+                            networkBufferSize,
+                            HybridShuffleConfiguration.builder(
+                                            numberOfSubpartitions,
+                                            batchShuffleReadBufferPool.getNumBuffersPerRequest())
+                                    .build(),
+                            bufferCompressor,
+                            bufferPoolFactory);
         } else {
             throw new IllegalArgumentException("Unrecognized ResultPartitionType: " + type);
         }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
index 0cb6eb61341..ee341f50535 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
@@ -89,7 +89,7 @@ public enum ResultPartitionType {
      *
      * <p>Hybrid partitions can be consumed any time, whether fully produced or not.
      */
-    HYBRID(true, false, ConsumingConstraint.CAN_BE_PIPELINED, ReleaseBy.SCHEDULER);
+    HYBRID(false, false, ConsumingConstraint.CAN_BE_PIPELINED, ReleaseBy.SCHEDULER);
 
     /** Does this partition use a limited number of (network) buffers? */
     private final boolean isBounded;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
index f0b31679f37..cd22abb953e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
 import org.apache.flink.runtime.io.disk.FileChannelManager;
 import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.partition.hybrid.HsResultPartition;
 import org.apache.flink.runtime.shuffle.PartitionDescriptorBuilder;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
@@ -82,6 +83,12 @@ public class ResultPartitionFactoryTest extends TestLogger {
         assertTrue(resultPartition instanceof SortMergeResultPartition);
     }
 
+    @Test
+    public void testHybridResultPartitionCreated() {
+        ResultPartition resultPartition = createResultPartition(ResultPartitionType.HYBRID);
+        assertTrue(resultPartition instanceof HsResultPartition);
+    }
+
     @Test
     public void testNoReleaseOnConsumptionForBoundedBlockingPartition() {
         final ResultPartition resultPartition = createResultPartition(ResultPartitionType.BLOCKING);
@@ -101,6 +108,15 @@ public class ResultPartitionFactoryTest extends TestLogger {
         assertFalse(resultPartition.isReleased());
     }
 
+    @Test
+    public void testNoReleaseOnConsumptionForHybridPartition() {
+        final ResultPartition resultPartition = createResultPartition(ResultPartitionType.HYBRID);
+
+        resultPartition.onConsumedSubpartition(0);
+
+        assertFalse(resultPartition.isReleased());
+    }
+
     private static ResultPartition createResultPartition(ResultPartitionType partitionType) {
         return createResultPartition(partitionType, Integer.MAX_VALUE);
     }


[flink] 02/13: [hotfix] HsMemoryDataManager spillAsync's callback should assertNoException.

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 42f05f0cddd45b4ec54f1e7c52796e5e26cfbf6f
Author: Weijie Guo <re...@163.com>
AuthorDate: Thu Jul 28 17:19:36 2022 +0800

    [hotfix] HsMemoryDataManager spillAsync's callback should assertNoException.
---
 .../io/network/partition/hybrid/HsMemoryDataManager.java  | 15 ++++++++-------
 1 file changed, 8 insertions(+), 7 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
index b58338b6912..0f0744fe9ab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategy.Decision;
+import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.function.SupplierWithException;
 
 import java.io.IOException;
@@ -222,13 +223,13 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData
                     // decrease numUnSpillBuffers as this subpartition's buffer is spill.
                     numUnSpillBuffers.getAndAdd(-bufferIndexAndChannels.size());
                 });
-
-        spiller.spillAsync(bufferWithIdentities)
-                .thenAccept(
-                        spilledBuffers -> {
-                            fileDataIndex.addBuffers(spilledBuffers);
-                            spillingCompleteFuture.complete(null);
-                        });
+        FutureUtils.assertNoException(
+                spiller.spillAsync(bufferWithIdentities)
+                        .thenAccept(
+                                spilledBuffers -> {
+                                    fileDataIndex.addBuffers(spilledBuffers);
+                                    spillingCompleteFuture.complete(null);
+                                }));
     }
 
     /**