You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/08/02 01:57:26 UTC

[flink] 11/13: [FLINK-27908] Introduce HsSubpartitionView based on HsDataView and it's implementations.

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d0b133814838e3614a41e70b5e6a27b80afa0967
Author: Weijie Guo <re...@163.com>
AuthorDate: Thu Jul 28 20:14:52 2022 +0800

    [FLINK-27908] Introduce HsSubpartitionView based on HsDataView and it's implementations.
---
 .../io/network/partition/hybrid/HsDataView.java    |  60 ++++
 .../partition/hybrid/HsFileDataManager.java        |  16 +-
 .../partition/hybrid/HsMemoryDataManager.java      |  23 +-
 .../partition/hybrid/HsSubpartitionFileReader.java |   6 +-
 .../hybrid/HsSubpartitionFileReaderImpl.java       |  86 +++++-
 .../hybrid/HsSubpartitionMemoryDataManager.java    |  29 +-
 .../partition/hybrid/HsSubpartitionView.java       | 262 +++++++++++++++++
 .../partition/hybrid/HsFileDataManagerTest.java    |  27 +-
 .../hybrid/HsSubpartitionFileReaderImplTest.java   |  88 +++++-
 .../HsSubpartitionMemoryDataManagerTest.java       |  20 +-
 .../partition/hybrid/HsSubpartitionViewTest.java   | 313 +++++++++++++++++++++
 .../partition/hybrid/TestingHsDataView.java        | 127 +++++++++
 12 files changed, 1011 insertions(+), 46 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsDataView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsDataView.java
new file mode 100644
index 00000000000..c3d4cb9ed83
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsDataView.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer.DataType;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
+
+import java.util.Optional;
+
+/**
+ * A view for {@link HsSubpartitionView} to find out what data exists in memory or disk and polling
+ * the data.
+ */
+public interface HsDataView {
+
+    /**
+     * Try to consume next buffer.
+     *
+     * <p>Only invoked by consumer thread.
+     *
+     * @param nextBufferToConsume next buffer index to consume.
+     * @return If the target buffer does exist, return buffer and next buffer's backlog, otherwise
+     *     return {@link Optional#empty()}.
+     */
+    Optional<BufferAndBacklog> consumeBuffer(int nextBufferToConsume) throws Throwable;
+
+    /**
+     * Get dataType of next buffer to consume.
+     *
+     * @param nextBufferToConsume next buffer index to consume
+     * @return next buffer's dataType. If not found in memory, return {@link DataType#NONE}.
+     */
+    DataType peekNextToConsumeDataType(int nextBufferToConsume);
+
+    /**
+     * Get the number of buffers backlog.
+     *
+     * @return backlog of this view's corresponding subpartition.
+     */
+    int getBacklog();
+
+    /** Release this {@link HsDataView} when related subpartition view is releasing. */
+    void releaseDataView();
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
index f37a1ff5164..6070a838110 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
@@ -147,7 +147,7 @@ public class HsFileDataManager implements Runnable, BufferRecycler {
     }
 
     /** This method only called by result partition to create subpartitionFileReader. */
-    public HsSubpartitionFileReader registerNewSubpartition(
+    public HsDataView registerNewSubpartition(
             int subpartitionId, HsSubpartitionViewInternalOperations operation) throws IOException {
         synchronized (lock) {
             checkState(!isReleased, "HsFileDataManager is already released.");
@@ -159,7 +159,8 @@ public class HsFileDataManager implements Runnable, BufferRecycler {
                             dataFileChannel,
                             operation,
                             dataIndex,
-                            hybridShuffleConfiguration.getMaxBuffersReadAhead());
+                            hybridShuffleConfiguration.getMaxBuffersReadAhead(),
+                            this::releaseSubpartitionReader);
 
             allReaders.add(subpartitionReader);
 
@@ -172,6 +173,17 @@ public class HsFileDataManager implements Runnable, BufferRecycler {
         IOUtils.deleteFileQuietly(dataFilePath);
     }
 
+    /**
+     * Release specific {@link HsSubpartitionFileReader} from {@link HsFileDataManager}.
+     *
+     * @param subpartitionFileReader to release.
+     */
+    public void releaseSubpartitionReader(HsSubpartitionFileReader subpartitionFileReader) {
+        synchronized (lock) {
+            removeSubpartitionReaders(Collections.singleton(subpartitionFileReader));
+        }
+    }
+
     /** Releases this file data manager and delete shuffle data after all readers is removed. */
     public void release() {
         synchronized (lock) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
index 9bf57b51c4b..11228fe7206 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
@@ -121,7 +121,7 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData
      * #subpartitionViewOperationsMap}. It is used to obtain the consumption progress of the
      * subpartition.
      */
-    public void registerSubpartitionView(
+    public HsDataView registerSubpartitionView(
             int subpartitionId, HsSubpartitionViewInternalOperations viewOperations) {
         HsSubpartitionViewInternalOperations oldView =
                 subpartitionViewOperationsMap.put(subpartitionId, viewOperations);
@@ -130,6 +130,7 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData
                     "subpartition : {} register subpartition view will replace old view. ",
                     subpartitionId);
         }
+        return getSubpartitionMemoryDataManager(subpartitionId);
     }
 
     /** Close this {@link HsMemoryDataManager}, it means no data can append to memory. */
@@ -318,24 +319,4 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData
             lock.unlock();
         }
     }
-
-    /** Integrate the buffer and dataType of next buffer. */
-    public static class BufferAndNextDataType {
-        private final Buffer buffer;
-
-        private final Buffer.DataType nextDataType;
-
-        public BufferAndNextDataType(Buffer buffer, Buffer.DataType nextDataType) {
-            this.buffer = buffer;
-            this.nextDataType = nextDataType;
-        }
-
-        public Buffer getBuffer() {
-            return buffer;
-        }
-
-        public Buffer.DataType getNextDataType() {
-            return nextDataType;
-        }
-    }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReader.java
index e582be2a620..fbc044d6da0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReader.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import java.io.IOException;
 import java.nio.channels.FileChannel;
 import java.util.Queue;
+import java.util.function.Consumer;
 
 /**
  * This component is responsible for reading data from disk for a specific subpartition.
@@ -31,7 +32,7 @@ import java.util.Queue;
  * <p>In order to access the disk as sequentially as possible {@link HsSubpartitionFileReader} need
  * to be able to compare priorities.
  */
-public interface HsSubpartitionFileReader extends Comparable<HsSubpartitionFileReader> {
+public interface HsSubpartitionFileReader extends Comparable<HsSubpartitionFileReader>, HsDataView {
     /** Do prep work before this {@link HsSubpartitionFileReader} is scheduled to read data. */
     void prepareForScheduling();
 
@@ -58,6 +59,7 @@ public interface HsSubpartitionFileReader extends Comparable<HsSubpartitionFileR
                 FileChannel dataFileChannel,
                 HsSubpartitionViewInternalOperations operation,
                 HsFileDataIndex dataIndex,
-                int maxBuffersReadAhead);
+                int maxBuffersReadAhead,
+                Consumer<HsSubpartitionFileReader> fileReaderReleaser);
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
index ecf02260dd1..a355765ac35 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
@@ -23,6 +23,8 @@ import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
+import org.apache.flink.util.ExceptionUtils;
 
 import javax.annotation.Nullable;
 
@@ -34,6 +36,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Queue;
 import java.util.concurrent.LinkedBlockingDeque;
+import java.util.function.Consumer;
 
 import static org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil.positionToNextBuffer;
 import static org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil.readFromByteChannel;
@@ -62,6 +65,8 @@ public class HsSubpartitionFileReaderImpl implements HsSubpartitionFileReader {
 
     private final Deque<BufferIndexOrError> loadedBuffers = new LinkedBlockingDeque<>();
 
+    private final Consumer<HsSubpartitionFileReader> fileReaderReleaser;
+
     private volatile boolean isFailed;
 
     public HsSubpartitionFileReaderImpl(
@@ -69,12 +74,14 @@ public class HsSubpartitionFileReaderImpl implements HsSubpartitionFileReader {
             FileChannel dataFileChannel,
             HsSubpartitionViewInternalOperations operations,
             HsFileDataIndex dataIndex,
-            int maxBufferReadAhead) {
+            int maxBufferReadAhead,
+            Consumer<HsSubpartitionFileReader> fileReaderReleaser) {
         this.subpartitionId = subpartitionId;
         this.dataFileChannel = dataFileChannel;
         this.operations = operations;
         this.bufferIndexManager = new BufferIndexManager(maxBufferReadAhead);
         this.cachedRegionManager = new CachedRegionManager(subpartitionId, dataIndex);
+        this.fileReaderReleaser = fileReaderReleaser;
     }
 
     @Override
@@ -192,10 +199,77 @@ public class HsSubpartitionFileReaderImpl implements HsSubpartitionFileReader {
         return loadedBuffers;
     }
 
+    @Override
+    public Optional<ResultSubpartition.BufferAndBacklog> consumeBuffer(int nextBufferToConsume)
+            throws Throwable {
+        if (!checkAndGetFirstBufferIndexOrError(nextBufferToConsume).isPresent()) {
+            return Optional.empty();
+        }
+
+        // already ensure that peek element is not null and not throwable.
+        BufferIndexOrError current = checkNotNull(loadedBuffers.poll());
+
+        BufferIndexOrError next = loadedBuffers.peek();
+
+        Buffer.DataType nextDataType = next == null ? Buffer.DataType.NONE : next.getDataType();
+        int backlog = loadedBuffers.size();
+        int bufferIndex = current.getIndex();
+        Buffer buffer =
+                current.getBuffer()
+                        .orElseThrow(
+                                () ->
+                                        new NullPointerException(
+                                                "Get a non-throwable and non-buffer bufferIndexOrError, which is not allowed"));
+        return Optional.of(
+                ResultSubpartition.BufferAndBacklog.fromBufferAndLookahead(
+                        buffer, nextDataType, backlog, bufferIndex));
+    }
+
+    @Override
+    public Buffer.DataType peekNextToConsumeDataType(int nextBufferToConsume) {
+        Buffer.DataType dataType = Buffer.DataType.NONE;
+        try {
+            dataType =
+                    checkAndGetFirstBufferIndexOrError(nextBufferToConsume)
+                            .map(BufferIndexOrError::getDataType)
+                            .orElse(Buffer.DataType.NONE);
+        } catch (Throwable throwable) {
+            ExceptionUtils.rethrow(throwable);
+        }
+        return dataType;
+    }
+
+    @Override
+    public void releaseDataView() {
+        fileReaderReleaser.accept(this);
+    }
+
+    @Override
+    public int getBacklog() {
+        return loadedBuffers.size();
+    }
+
     // ------------------------------------------------------------------------
     //  Internal Methods
     // ------------------------------------------------------------------------
 
+    private Optional<BufferIndexOrError> checkAndGetFirstBufferIndexOrError(int expectedBufferIndex)
+            throws Throwable {
+        if (loadedBuffers.isEmpty()) {
+            return Optional.empty();
+        }
+
+        BufferIndexOrError peek = loadedBuffers.peek();
+
+        if (peek.getThrowable().isPresent()) {
+            throw peek.getThrowable().get();
+        } else if (peek.getIndex() != expectedBufferIndex) {
+            return Optional.empty();
+        }
+
+        return Optional.of(peek);
+    }
+
     private void moveFileOffsetToBuffer(int bufferIndex) throws IOException {
         Tuple2<Integer, Long> indexAndOffset =
                 cachedRegionManager.getNumSkipAndFileOffset(bufferIndex);
@@ -403,9 +477,15 @@ public class HsSubpartitionFileReaderImpl implements HsSubpartitionFileReader {
                 FileChannel dataFileChannel,
                 HsSubpartitionViewInternalOperations operation,
                 HsFileDataIndex dataIndex,
-                int maxBuffersReadAhead) {
+                int maxBuffersReadAhead,
+                Consumer<HsSubpartitionFileReader> fileReaderReleaser) {
             return new HsSubpartitionFileReaderImpl(
-                    subpartitionId, dataFileChannel, operation, dataIndex, maxBuffersReadAhead);
+                    subpartitionId,
+                    dataFileChannel,
+                    operation,
+                    dataIndex,
+                    maxBuffersReadAhead,
+                    fileReaderReleaser);
         }
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java
index b5079e7df21..17b7e833e29 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
 import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider.ConsumeStatus;
 import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider.SpillStatus;
 import org.apache.flink.util.function.SupplierWithException;
@@ -54,7 +55,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * This class is responsible for managing the data in a single subpartition. One {@link
  * HsMemoryDataManager} will hold multiple {@link HsSubpartitionMemoryDataManager}.
  */
-public class HsSubpartitionMemoryDataManager {
+public class HsSubpartitionMemoryDataManager implements HsDataView {
     private final int targetChannel;
 
     private final int bufferSize;
@@ -108,22 +109,24 @@ public class HsSubpartitionMemoryDataManager {
     @SuppressWarnings("FieldAccessNotGuarded")
     // Note that: callWithLock ensure that code block guarded by resultPartitionReadLock and
     // subpartitionLock.
+    @Override
     public DataType peekNextToConsumeDataType(int nextToConsumeIndex) {
         return callWithLock(() -> peekNextToConsumeDataTypeInternal(nextToConsumeIndex));
     }
 
     /**
      * Check whether the head of {@link #unConsumedBuffers} is the buffer to be consumed. If so,
-     * return the buffer and next data type.
+     * return the buffer and backlog.
      *
      * @param toConsumeIndex index of buffer to be consumed.
      * @return If the head of {@link #unConsumedBuffers} is target, return optional of the buffer
-     *     and next data type. Otherwise, return {@link Optional#empty()}.
+     *     and backlog. Otherwise, return {@link Optional#empty()}.
      */
     @SuppressWarnings("FieldAccessNotGuarded")
     // Note that: callWithLock ensure that code block guarded by resultPartitionReadLock and
     // subpartitionLock.
-    public Optional<HsMemoryDataManager.BufferAndNextDataType> consumeBuffer(int toConsumeIndex) {
+    @Override
+    public Optional<BufferAndBacklog> consumeBuffer(int toConsumeIndex) {
         Optional<Tuple2<HsBufferContext, DataType>> bufferAndNextDataType =
                 callWithLock(
                         () -> {
@@ -145,8 +148,22 @@ public class HsSubpartitionMemoryDataManager {
                                 tuple.f0.getBufferIndexAndChannel()));
         return bufferAndNextDataType.map(
                 tuple ->
-                        new HsMemoryDataManager.BufferAndNextDataType(
-                                tuple.f0.getBuffer(), tuple.f1));
+                        new BufferAndBacklog(
+                                tuple.f0.getBuffer(), getBacklog(), tuple.f1, toConsumeIndex));
+    }
+
+    @SuppressWarnings("FieldAccessNotGuarded")
+    // Un-synchronized get unConsumedBuffers size to provide memory data backlog,this will make the
+    // result greater than or equal to the actual backlog, but obtaining an accurate backlog will
+    // bring too much extra overhead.
+    @Override
+    public int getBacklog() {
+        return unConsumedBuffers.size();
+    }
+
+    @Override
+    public void releaseDataView() {
+        // nothing to do for memory data.
     }
 
     // ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java
new file mode 100644
index 00000000000..5aa7e8bac0b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** The read view of HsResultPartition, data can be read from memory or disk. */
+public class HsSubpartitionView
+        implements ResultSubpartitionView, HsSubpartitionViewInternalOperations {
+    private final BufferAvailabilityListener availabilityListener;
+    private final Object lock = new Object();
+
+    /** Index of last consumed buffer. */
+    @GuardedBy("lock")
+    private int lastConsumedBufferIndex = -1;
+
+    @GuardedBy("lock")
+    private boolean needNotify = false;
+
+    @Nullable
+    @GuardedBy("lock")
+    private Buffer.DataType cachedNextDataType = null;
+
+    @Nullable
+    @GuardedBy("lock")
+    private Throwable failureCause = null;
+
+    @GuardedBy("lock")
+    private boolean isReleased = false;
+
+    @Nullable
+    @GuardedBy("lock")
+    // diskDataView can be null only before initialization.
+    private HsDataView diskDataView;
+
+    @Nullable
+    @GuardedBy("lock")
+    // memoryDataView can be null only before initialization.
+    private HsDataView memoryDataView;
+
+    public HsSubpartitionView(BufferAvailabilityListener availabilityListener) {
+        this.availabilityListener = availabilityListener;
+    }
+
+    @Nullable
+    @Override
+    public BufferAndBacklog getNextBuffer() {
+        synchronized (lock) {
+            try {
+                checkNotNull(diskDataView, "disk data view must be not null.");
+                checkNotNull(memoryDataView, "memory data view must be not null.");
+
+                Optional<BufferAndBacklog> bufferToConsume = tryReadFromDisk();
+                if (!bufferToConsume.isPresent()) {
+                    bufferToConsume = memoryDataView.consumeBuffer(lastConsumedBufferIndex + 1);
+                }
+                updateConsumingStatus(bufferToConsume);
+                return bufferToConsume.orElse(null);
+            } catch (Throwable cause) {
+                releaseInternal(cause);
+                return null;
+            }
+        }
+    }
+
+    @Override
+    public void notifyDataAvailable() {
+        boolean notifyDownStream = false;
+        synchronized (lock) {
+            if (isReleased) {
+                return;
+            }
+            if (needNotify) {
+                notifyDownStream = true;
+                needNotify = false;
+            }
+        }
+        // notify outside of lock to avoid deadlock
+        if (notifyDownStream) {
+            availabilityListener.notifyDataAvailable();
+        }
+    }
+
+    @Override
+    public AvailabilityWithBacklog getAvailabilityAndBacklog(int numCreditsAvailable) {
+        synchronized (lock) {
+            boolean availability = numCreditsAvailable > 0;
+            if (numCreditsAvailable <= 0
+                    && cachedNextDataType != null
+                    && cachedNextDataType == Buffer.DataType.EVENT_BUFFER) {
+                availability = true;
+            }
+            return new AvailabilityWithBacklog(availability, diskDataView.getBacklog());
+        }
+    }
+
+    @Override
+    public void releaseAllResources() throws IOException {
+        releaseInternal(null);
+    }
+
+    @Override
+    public boolean isReleased() {
+        synchronized (lock) {
+            return isReleased;
+        }
+    }
+
+    @Override
+    public int getConsumingOffset() {
+        synchronized (lock) {
+            return lastConsumedBufferIndex;
+        }
+    }
+
+    @Override
+    public Throwable getFailureCause() {
+        synchronized (lock) {
+            return failureCause;
+        }
+    }
+
+    /**
+     * Set {@link HsDataView} for this subpartition, this method only called when {@link
+     * HsSubpartitionFileReader} is creating.
+     */
+    void setDiskDataView(HsDataView diskDataView) {
+        synchronized (lock) {
+            checkState(this.diskDataView == null, "repeatedly set disk data view is not allowed.");
+            this.diskDataView = diskDataView;
+        }
+    }
+
+    /**
+     * Set {@link HsDataView} for this subpartition, this method only called when {@link
+     * HsSubpartitionFileReader} is creating.
+     */
+    void setMemoryDataView(HsDataView memoryDataView) {
+        synchronized (lock) {
+            checkState(
+                    this.memoryDataView == null, "repeatedly set memory data view is not allowed.");
+            this.memoryDataView = memoryDataView;
+        }
+    }
+
+    @Override
+    public void resumeConsumption() {
+        throw new UnsupportedOperationException("resumeConsumption should never be called.");
+    }
+
+    @Override
+    public void acknowledgeAllDataProcessed() {
+        // in case of bounded partitions there is no upstream to acknowledge, we simply ignore
+        // the ack, as there are no checkpoints
+    }
+
+    @SuppressWarnings("FieldAccessNotGuarded")
+    @Override
+    public int unsynchronizedGetNumberOfQueuedBuffers() {
+        return diskDataView.getBacklog();
+    }
+
+    @SuppressWarnings("FieldAccessNotGuarded")
+    @Override
+    public int getNumberOfQueuedBuffers() {
+        return diskDataView.getBacklog();
+    }
+
+    @Override
+    public void notifyNewBufferSize(int newBufferSize) {
+        throw new UnsupportedOperationException("Method should never be called.");
+    }
+
+    // -------------------------------
+    //       Internal Methods
+    // -------------------------------
+
+    @GuardedBy("lock")
+    private Optional<BufferAndBacklog> tryReadFromDisk() throws Throwable {
+        final int nextBufferIndexToConsume = lastConsumedBufferIndex + 1;
+        return checkNotNull(diskDataView)
+                .consumeBuffer(nextBufferIndexToConsume)
+                .map(
+                        bufferAndBacklog -> {
+                            if (bufferAndBacklog.getNextDataType() == Buffer.DataType.NONE) {
+                                return new BufferAndBacklog(
+                                        bufferAndBacklog.buffer(),
+                                        bufferAndBacklog.buffersInBacklog(),
+                                        checkNotNull(memoryDataView)
+                                                .peekNextToConsumeDataType(
+                                                        nextBufferIndexToConsume + 1),
+                                        bufferAndBacklog.getSequenceNumber());
+                            }
+                            return bufferAndBacklog;
+                        });
+    }
+
+    @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
+    @GuardedBy("lock")
+    private void updateConsumingStatus(Optional<BufferAndBacklog> bufferAndBacklog) {
+        assert Thread.holdsLock(lock);
+        // if consumed, update and check consume offset
+        if (bufferAndBacklog.isPresent()) {
+            ++lastConsumedBufferIndex;
+            checkState(bufferAndBacklog.get().getSequenceNumber() == lastConsumedBufferIndex);
+        }
+
+        // update need-notify
+        boolean dataAvailable =
+                bufferAndBacklog.map(BufferAndBacklog::isDataAvailable).orElse(false);
+        needNotify = !dataAvailable;
+        // update cached next data type
+        cachedNextDataType = bufferAndBacklog.map(BufferAndBacklog::getNextDataType).orElse(null);
+    }
+
+    private void releaseInternal(@Nullable Throwable throwable) {
+        boolean releaseSubpartitionReader = false;
+        synchronized (lock) {
+            if (isReleased) {
+                return;
+            }
+            isReleased = true;
+            failureCause = throwable;
+            if (diskDataView != null) {
+                releaseSubpartitionReader = true;
+            }
+        }
+        // release subpartition reader outside of lock to avoid deadlock.
+        if (releaseSubpartitionReader) {
+            //noinspection FieldAccessNotGuarded
+            diskDataView.releaseDataView();
+        }
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java
index 1a39e14ca02..1a2e28959a6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java
@@ -21,7 +21,9 @@ package org.apache.flink.runtime.io.network.partition.hybrid;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
 import org.apache.flink.util.TestLoggerExtension;
 import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
 import org.apache.flink.util.function.BiConsumerWithException;
@@ -40,6 +42,7 @@ import java.nio.file.Path;
 import java.nio.file.StandardOpenOption;
 import java.time.Duration;
 import java.util.ArrayDeque;
+import java.util.Optional;
 import java.util.Queue;
 import java.util.Random;
 import java.util.concurrent.CompletableFuture;
@@ -396,6 +399,27 @@ class HsFileDataManagerTest {
             this.failConsumer = failConsumer;
         }
 
+        @Override
+        public Optional<ResultSubpartition.BufferAndBacklog> consumeBuffer(
+                int nextBufferToConsume) {
+            return Optional.empty();
+        }
+
+        @Override
+        public Buffer.DataType peekNextToConsumeDataType(int nextBufferToConsume) {
+            return Buffer.DataType.NONE;
+        }
+
+        @Override
+        public int getBacklog() {
+            return 0;
+        }
+
+        @Override
+        public void releaseDataView() {
+            // do nothing.
+        }
+
         /** Factory for {@link TestingHsSubpartitionFileReader}. */
         private static class Factory implements HsSubpartitionFileReader.Factory {
             private final Queue<HsSubpartitionFileReader> allReaders = new ArrayDeque<>();
@@ -406,7 +430,8 @@ class HsFileDataManagerTest {
                     FileChannel dataFileChannel,
                     HsSubpartitionViewInternalOperations operation,
                     HsFileDataIndex dataIndex,
-                    int maxBuffersReadAhead) {
+                    int maxBuffersReadAhead,
+                    Consumer<HsSubpartitionFileReader> fileReaderReleaser) {
                 return checkNotNull(allReaders.poll());
             }
         }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java
index 36a0833163d..d25a5a34fb3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.Buffer.DataType;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
@@ -360,6 +361,86 @@ class HsSubpartitionFileReaderImplTest {
         assertThat(fileReader1).isGreaterThan(fileReader2);
     }
 
+    @Test
+    void testConsumeBuffer() throws Throwable {
+        TestingSubpartitionViewInternalOperation viewNotifier =
+                new TestingSubpartitionViewInternalOperation();
+        HsSubpartitionFileReaderImpl subpartitionFileReader =
+                createSubpartitionFileReader(0, viewNotifier);
+
+        // if no preload data in file reader, return Optional.empty.
+        assertThat(subpartitionFileReader.consumeBuffer(0)).isNotPresent();
+
+        // buffers in file: (0-0, 0-1)
+        writeDataToFile(0, 0, 0, 2);
+
+        Queue<MemorySegment> memorySegments = createsMemorySegments(2);
+        // trigger reading, add buffer to queue.
+        subpartitionFileReader.readBuffers(memorySegments, (ignore) -> {});
+
+        // if nextBufferToConsume is not equal to peek elements index, return Optional.empty.
+        assertThat(subpartitionFileReader.consumeBuffer(10)).isNotPresent();
+
+        assertThat(subpartitionFileReader.consumeBuffer(0))
+                .hasValueSatisfying(
+                        (bufferAndBacklog -> {
+                            assertThat(bufferAndBacklog.getNextDataType())
+                                    .isEqualTo(DataType.EVENT_BUFFER);
+                            assertThat(bufferAndBacklog.getSequenceNumber()).isEqualTo(0);
+                            // first buffer's data is 0.
+                            assertThat(
+                                            bufferAndBacklog
+                                                    .buffer()
+                                                    .getNioBufferReadable()
+                                                    .order(ByteOrder.nativeOrder())
+                                                    .getInt())
+                                    .isEqualTo(0);
+                        }));
+    }
+
+    @Test
+    void testPeekNextToConsumeDataTypeOrConsumeBufferThrowException() {
+        TestingSubpartitionViewInternalOperation viewNotifier =
+                new TestingSubpartitionViewInternalOperation();
+        HsSubpartitionFileReaderImpl subpartitionFileReader =
+                createSubpartitionFileReader(0, viewNotifier);
+
+        subpartitionFileReader.fail(new RuntimeException("expected exception."));
+
+        assertThatThrownBy(() -> subpartitionFileReader.peekNextToConsumeDataType(0))
+                .isInstanceOf(RuntimeException.class)
+                .hasMessageContaining("expected exception.");
+
+        assertThatThrownBy(() -> subpartitionFileReader.consumeBuffer(0))
+                .isInstanceOf(RuntimeException.class)
+                .hasMessageContaining("expected exception.");
+    }
+
+    @Test
+    void testPeekNextToConsumeDataType() throws Throwable {
+        TestingSubpartitionViewInternalOperation viewNotifier =
+                new TestingSubpartitionViewInternalOperation();
+        HsSubpartitionFileReaderImpl subpartitionFileReader =
+                createSubpartitionFileReader(0, viewNotifier);
+
+        // if no preload data in file reader, return DataType.NONE.
+        assertThat(subpartitionFileReader.peekNextToConsumeDataType(0)).isEqualTo(DataType.NONE);
+
+        // buffers in file: (0-0, 0-1)
+        writeDataToFile(0, 0, 2);
+
+        Queue<MemorySegment> memorySegments = createsMemorySegments(2);
+        // trigger reading, add buffer to queue.
+        subpartitionFileReader.readBuffers(memorySegments, (ignore) -> {});
+
+        // if nextBufferToConsume is not equal to peek elements index, return DataType.NONE.
+        assertThat(subpartitionFileReader.peekNextToConsumeDataType(10)).isEqualTo(DataType.NONE);
+
+        // if nextBufferToConsume is equal to peek elements index, return the real DataType.
+        assertThat(subpartitionFileReader.peekNextToConsumeDataType(0))
+                .isEqualTo(DataType.DATA_BUFFER);
+    }
+
     private static void checkData(HsSubpartitionFileReaderImpl fileReader, int... expectedData) {
         assertThat(fileReader.getLoadedBuffers()).hasSameSizeAs(expectedData);
         for (int data : expectedData) {
@@ -383,7 +464,12 @@ class HsSubpartitionFileReaderImplTest {
     private HsSubpartitionFileReaderImpl createSubpartitionFileReader(
             int targetChannel, HsSubpartitionViewInternalOperations operations) {
         return new HsSubpartitionFileReaderImpl(
-                targetChannel, dataFileChannel, operations, diskIndex, MAX_BUFFERS_READ_AHEAD);
+                targetChannel,
+                dataFileChannel,
+                operations,
+                diskIndex,
+                MAX_BUFFERS_READ_AHEAD,
+                (ignore) -> {});
     }
 
     private static FileChannel openFileChannel(Path path) throws IOException {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManagerTest.java
index a15dfddc9f2..f1b16d0d231 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManagerTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.Buffer.DataType;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
-import org.apache.flink.runtime.io.network.partition.hybrid.HsMemoryDataManager.BufferAndNextDataType;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
 import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider.ConsumeStatus;
 import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider.SpillStatus;
 
@@ -372,14 +372,14 @@ class HsSubpartitionMemoryDataManagerTest {
 
     private static void checkConsumedBufferAndNextDataType(
             List<Tuple2<Integer, Buffer.DataType>> expectedRecords,
-            List<Optional<BufferAndNextDataType>> bufferAndNextDataTypesOpt) {
-        checkArgument(expectedRecords.size() == bufferAndNextDataTypesOpt.size());
-        for (int i = 0; i < bufferAndNextDataTypesOpt.size(); i++) {
+            List<Optional<BufferAndBacklog>> bufferAndBacklogOpt) {
+        checkArgument(expectedRecords.size() == bufferAndBacklogOpt.size());
+        for (int i = 0; i < bufferAndBacklogOpt.size(); i++) {
             final int index = i;
-            assertThat(bufferAndNextDataTypesOpt.get(index))
+            assertThat(bufferAndBacklogOpt.get(index))
                     .hasValueSatisfying(
-                            (bufferAndNextDataType -> {
-                                Buffer buffer = bufferAndNextDataType.getBuffer();
+                            (bufferAndBacklog -> {
+                                Buffer buffer = bufferAndBacklog.buffer();
                                 int value =
                                         buffer.getNioBufferReadable()
                                                 .order(ByteOrder.LITTLE_ENDIAN)
@@ -387,11 +387,11 @@ class HsSubpartitionMemoryDataManagerTest {
                                 Buffer.DataType dataType = buffer.getDataType();
                                 assertThat(value).isEqualTo(expectedRecords.get(index).f0);
                                 assertThat(dataType).isEqualTo(expectedRecords.get(index).f1);
-                                if (index != bufferAndNextDataTypesOpt.size() - 1) {
-                                    assertThat(bufferAndNextDataType.getNextDataType())
+                                if (index != bufferAndBacklogOpt.size() - 1) {
+                                    assertThat(bufferAndBacklog.getNextDataType())
                                             .isEqualTo(expectedRecords.get(index + 1).f1);
                                 } else {
-                                    assertThat(bufferAndNextDataType.getNextDataType())
+                                    assertThat(bufferAndBacklog.getNextDataType())
                                             .isEqualTo(Buffer.DataType.NONE);
                                 }
                             }));
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java
new file mode 100644
index 00000000000..77a797a80f1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.Buffer.DataType;
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView.AvailabilityWithBacklog;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link HsSubpartitionView}. */
+class HsSubpartitionViewTest {
+    @Test
+    void testGetNextBufferFromDisk() {
+        HsSubpartitionView subpartitionView = createSubpartitionView();
+
+        BufferAndBacklog bufferAndBacklog = createBufferAndBacklog(0, DataType.DATA_BUFFER, 0);
+        CompletableFuture<Void> consumeBufferFromMemoryFuture = new CompletableFuture<>();
+        TestingHsDataView diskDataView =
+                TestingHsDataView.builder()
+                        .setConsumeBufferFunction(
+                                (bufferToConsume) -> Optional.of(bufferAndBacklog))
+                        .build();
+        TestingHsDataView memoryDataView =
+                TestingHsDataView.builder()
+                        .setConsumeBufferFunction(
+                                (ignore) -> {
+                                    consumeBufferFromMemoryFuture.complete(null);
+                                    return Optional.empty();
+                                })
+                        .build();
+        subpartitionView.setDiskDataView(diskDataView);
+        subpartitionView.setMemoryDataView(memoryDataView);
+
+        BufferAndBacklog nextBuffer = subpartitionView.getNextBuffer();
+        assertThat(consumeBufferFromMemoryFuture).isNotCompleted();
+        assertThat(nextBuffer).isSameAs(bufferAndBacklog);
+    }
+
+    @Test
+    void testGetNextBufferFromDiskNextDataTypeIsNone() {
+        HsSubpartitionView subpartitionView = createSubpartitionView();
+        BufferAndBacklog bufferAndBacklog = createBufferAndBacklog(0, DataType.NONE, 0);
+
+        TestingHsDataView diskDataView =
+                TestingHsDataView.builder()
+                        .setConsumeBufferFunction(
+                                (bufferToConsume) -> Optional.of(bufferAndBacklog))
+                        .build();
+
+        TestingHsDataView memoryDataView =
+                TestingHsDataView.builder()
+                        .setPeekNextToConsumeDataTypeFunction(
+                                (bufferToConsume) -> {
+                                    assertThat(bufferToConsume).isEqualTo(1);
+                                    return DataType.EVENT_BUFFER;
+                                })
+                        .build();
+        subpartitionView.setDiskDataView(diskDataView);
+        subpartitionView.setMemoryDataView(memoryDataView);
+
+        BufferAndBacklog nextBuffer = subpartitionView.getNextBuffer();
+        assertThat(nextBuffer).isNotNull();
+        assertThat(nextBuffer.buffer()).isSameAs(bufferAndBacklog.buffer());
+        assertThat(nextBuffer.buffersInBacklog()).isEqualTo(bufferAndBacklog.buffersInBacklog());
+        assertThat(nextBuffer.getSequenceNumber()).isEqualTo(bufferAndBacklog.getSequenceNumber());
+        assertThat(nextBuffer.getNextDataType()).isEqualTo(DataType.EVENT_BUFFER);
+    }
+
+    @Test
+    void testGetNextBufferFromMemory() {
+        HsSubpartitionView subpartitionView = createSubpartitionView();
+
+        BufferAndBacklog bufferAndBacklog = createBufferAndBacklog(0, DataType.DATA_BUFFER, 0);
+        TestingHsDataView memoryDataView =
+                TestingHsDataView.builder()
+                        .setConsumeBufferFunction(
+                                (bufferToConsume) -> Optional.of(bufferAndBacklog))
+                        .build();
+        TestingHsDataView diskDataView =
+                TestingHsDataView.builder()
+                        .setConsumeBufferFunction((bufferToConsume) -> Optional.empty())
+                        .build();
+        subpartitionView.setDiskDataView(diskDataView);
+        subpartitionView.setMemoryDataView(memoryDataView);
+
+        BufferAndBacklog nextBuffer = subpartitionView.getNextBuffer();
+        assertThat(nextBuffer).isSameAs(bufferAndBacklog);
+    }
+
+    @Test
+    void testGetNextBufferThrowException() {
+        HsSubpartitionView subpartitionView = createSubpartitionView();
+
+        TestingHsDataView diskDataView =
+                TestingHsDataView.builder()
+                        .setConsumeBufferFunction(
+                                (nextToConsume) -> {
+                                    throw new RuntimeException("expected exception.");
+                                })
+                        .build();
+        subpartitionView.setDiskDataView(diskDataView);
+        subpartitionView.setMemoryDataView(TestingHsDataView.NO_OP);
+
+        subpartitionView.getNextBuffer();
+        assertThat(subpartitionView.getFailureCause())
+                .isInstanceOf(RuntimeException.class)
+                .hasMessageContaining("expected exception.");
+        assertThat(subpartitionView.isReleased()).isTrue();
+    }
+
+    @Test
+    void testNotifyDataAvailableNeedNotify() {
+        CompletableFuture<Void> notifyAvailableFuture = new CompletableFuture<>();
+        HsSubpartitionView subpartitionView =
+                createSubpartitionView(() -> notifyAvailableFuture.complete(null));
+
+        TestingHsDataView memoryDataView =
+                TestingHsDataView.builder()
+                        .setConsumeBufferFunction(
+                                (bufferToConsume) ->
+                                        Optional.of(createBufferAndBacklog(0, DataType.NONE, 0)))
+                        .build();
+        subpartitionView.setMemoryDataView(memoryDataView);
+        subpartitionView.setDiskDataView(TestingHsDataView.NO_OP);
+
+        subpartitionView.getNextBuffer();
+        subpartitionView.notifyDataAvailable();
+        assertThat(notifyAvailableFuture).isCompleted();
+    }
+
+    @Test
+    void testNotifyDataAvailableNotNeedNotify() {
+        CompletableFuture<Void> notifyAvailableFuture = new CompletableFuture<>();
+        HsSubpartitionView subpartitionView =
+                createSubpartitionView(() -> notifyAvailableFuture.complete(null));
+
+        TestingHsDataView memoryDataView =
+                TestingHsDataView.builder()
+                        .setConsumeBufferFunction(
+                                (bufferToConsume) ->
+                                        Optional.of(
+                                                createBufferAndBacklog(0, DataType.DATA_BUFFER, 0)))
+                        .build();
+        subpartitionView.setMemoryDataView(memoryDataView);
+        subpartitionView.setDiskDataView(TestingHsDataView.NO_OP);
+
+        subpartitionView.getNextBuffer();
+        subpartitionView.notifyDataAvailable();
+        assertThat(notifyAvailableFuture).isNotCompleted();
+    }
+
+    @Test
+    void testGetAvailabilityAndBacklogPositiveCredit() {
+        HsSubpartitionView subpartitionView = createSubpartitionView();
+        subpartitionView.setMemoryDataView(TestingHsDataView.NO_OP);
+
+        final int backlog = 2;
+        subpartitionView.setDiskDataView(
+                TestingHsDataView.builder().setGetBacklogSupplier(() -> backlog).build());
+        AvailabilityWithBacklog availabilityAndBacklog =
+                subpartitionView.getAvailabilityAndBacklog(1);
+        assertThat(availabilityAndBacklog.getBacklog()).isEqualTo(backlog);
+        // positive credit always available.
+        assertThat(availabilityAndBacklog.isAvailable()).isTrue();
+    }
+
+    @Test
+    void testGetAvailabilityAndBacklogNonPositiveCreditNextIsData() {
+        final int backlog = 2;
+
+        HsSubpartitionView subpartitionView = createSubpartitionView();
+        subpartitionView.setMemoryDataView(
+                TestingHsDataView.builder()
+                        .setConsumeBufferFunction(
+                                (nextToConsume) ->
+                                        Optional.of(
+                                                createBufferAndBacklog(
+                                                        backlog, DataType.DATA_BUFFER, 0)))
+                        .build());
+        subpartitionView.setDiskDataView(
+                TestingHsDataView.builder().setGetBacklogSupplier(() -> backlog).build());
+
+        subpartitionView.getNextBuffer();
+
+        AvailabilityWithBacklog availabilityAndBacklog =
+                subpartitionView.getAvailabilityAndBacklog(0);
+        assertThat(availabilityAndBacklog.getBacklog()).isEqualTo(backlog);
+        // if credit is non-positive, only event can be available.
+        assertThat(availabilityAndBacklog.isAvailable()).isFalse();
+    }
+
+    @Test
+    void testGetAvailabilityAndBacklogNonPositiveCreditNextIsEvent() {
+        final int backlog = 2;
+
+        HsSubpartitionView subpartitionView = createSubpartitionView();
+        subpartitionView.setMemoryDataView(
+                TestingHsDataView.builder()
+                        .setConsumeBufferFunction(
+                                (nextToConsume) ->
+                                        Optional.of(
+                                                createBufferAndBacklog(
+                                                        backlog, DataType.EVENT_BUFFER, 0)))
+                        .build());
+        subpartitionView.setDiskDataView(
+                TestingHsDataView.builder().setGetBacklogSupplier(() -> backlog).build());
+
+        subpartitionView.getNextBuffer();
+
+        AvailabilityWithBacklog availabilityAndBacklog =
+                subpartitionView.getAvailabilityAndBacklog(0);
+        assertThat(availabilityAndBacklog.getBacklog()).isEqualTo(backlog);
+        // if credit is non-positive, only event can be available.
+        assertThat(availabilityAndBacklog.isAvailable()).isTrue();
+    }
+
+    @Test
+    void testRelease() throws Exception {
+        HsSubpartitionView subpartitionView = createSubpartitionView();
+        CompletableFuture<Void> releaseDataViewFuture = new CompletableFuture<>();
+        TestingHsDataView diskDataView =
+                TestingHsDataView.builder()
+                        .setReleaseDataViewRunnable(() -> releaseDataViewFuture.complete(null))
+                        .build();
+        subpartitionView.setDiskDataView(diskDataView);
+        subpartitionView.setMemoryDataView(TestingHsDataView.NO_OP);
+        subpartitionView.releaseAllResources();
+        assertThat(subpartitionView.isReleased()).isTrue();
+        assertThat(releaseDataViewFuture).isCompleted();
+    }
+
+    @Test
+    void testGetConsumingOffset() {
+        AtomicInteger nextBufferIndex = new AtomicInteger(0);
+        HsSubpartitionView subpartitionView = createSubpartitionView();
+        TestingHsDataView diskDataView =
+                TestingHsDataView.builder()
+                        .setConsumeBufferFunction(
+                                (toConsumeBuffer) ->
+                                        Optional.of(
+                                                createBufferAndBacklog(
+                                                        0,
+                                                        DataType.DATA_BUFFER,
+                                                        nextBufferIndex.getAndIncrement())))
+                        .build();
+        subpartitionView.setDiskDataView(diskDataView);
+        subpartitionView.setMemoryDataView(TestingHsDataView.NO_OP);
+
+        assertThat(subpartitionView.getConsumingOffset()).isEqualTo(-1);
+        subpartitionView.getNextBuffer();
+        assertThat(subpartitionView.getConsumingOffset()).isEqualTo(0);
+        subpartitionView.getNextBuffer();
+        assertThat(subpartitionView.getConsumingOffset()).isEqualTo(1);
+    }
+
+    @Test
+    void testSetDataViewRepeatedly() {
+        HsSubpartitionView subpartitionView = createSubpartitionView();
+
+        subpartitionView.setMemoryDataView(TestingHsDataView.NO_OP);
+        assertThatThrownBy(() -> subpartitionView.setMemoryDataView(TestingHsDataView.NO_OP))
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining("repeatedly set memory data view is not allowed.");
+
+        subpartitionView.setDiskDataView(TestingHsDataView.NO_OP);
+        assertThatThrownBy(() -> subpartitionView.setDiskDataView(TestingHsDataView.NO_OP))
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining("repeatedly set disk data view is not allowed.");
+    }
+
+    private static HsSubpartitionView createSubpartitionView() {
+        return new HsSubpartitionView(new NoOpBufferAvailablityListener());
+    }
+
+    private static HsSubpartitionView createSubpartitionView(
+            BufferAvailabilityListener bufferAvailabilityListener) {
+        return new HsSubpartitionView(bufferAvailabilityListener);
+    }
+
+    private static BufferAndBacklog createBufferAndBacklog(
+            int buffersInBacklog, DataType nextDataType, int sequenceNumber) {
+        final int bufferSize = 8;
+        Buffer buffer = HybridShuffleTestUtils.createBuffer(bufferSize, true);
+        return new BufferAndBacklog(buffer, buffersInBacklog, nextDataType, sequenceNumber);
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingHsDataView.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingHsDataView.java
new file mode 100644
index 00000000000..343bd9586df
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingHsDataView.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
+import org.apache.flink.util.function.FunctionWithException;
+
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/** Mock {@link HsDataView} for testing. */
+public class TestingHsDataView implements HsDataView {
+    public static final TestingHsDataView NO_OP = TestingHsDataView.builder().build();
+
+    private final FunctionWithException<
+                    Integer, Optional<ResultSubpartition.BufferAndBacklog>, Throwable>
+            consumeBufferFunction;
+
+    private final Function<Integer, Buffer.DataType> peekNextToConsumeDataTypeFunction;
+
+    private final Supplier<Integer> getBacklogSupplier;
+
+    private final Runnable releaseDataViewRunnable;
+
+    private TestingHsDataView(
+            FunctionWithException<Integer, Optional<ResultSubpartition.BufferAndBacklog>, Throwable>
+                    consumeBufferFunction,
+            Function<Integer, Buffer.DataType> peekNextToConsumeDataTypeFunction,
+            Supplier<Integer> getBacklogSupplier,
+            Runnable releaseDataViewRunnable) {
+        this.consumeBufferFunction = consumeBufferFunction;
+        this.peekNextToConsumeDataTypeFunction = peekNextToConsumeDataTypeFunction;
+        this.getBacklogSupplier = getBacklogSupplier;
+        this.releaseDataViewRunnable = releaseDataViewRunnable;
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    @Override
+    public Optional<ResultSubpartition.BufferAndBacklog> consumeBuffer(int nextBufferToConsume)
+            throws Throwable {
+        return consumeBufferFunction.apply(nextBufferToConsume);
+    }
+
+    @Override
+    public Buffer.DataType peekNextToConsumeDataType(int nextBufferToConsume) {
+        return peekNextToConsumeDataTypeFunction.apply(nextBufferToConsume);
+    }
+
+    @Override
+    public int getBacklog() {
+        return getBacklogSupplier.get();
+    }
+
+    @Override
+    public void releaseDataView() {
+        releaseDataViewRunnable.run();
+    }
+
+    /** Builder for {@link TestingHsDataView}. */
+    public static class Builder {
+        private FunctionWithException<
+                        Integer, Optional<ResultSubpartition.BufferAndBacklog>, Throwable>
+                consumeBufferFunction = (ignore) -> Optional.empty();
+
+        private Function<Integer, Buffer.DataType> peekNextToConsumeDataTypeFunction =
+                (ignore) -> Buffer.DataType.NONE;
+
+        private Supplier<Integer> getBacklogSupplier = () -> 0;
+
+        private Runnable releaseDataViewRunnable = () -> {};
+
+        private Builder() {}
+
+        public Builder setConsumeBufferFunction(
+                FunctionWithException<
+                                Integer, Optional<ResultSubpartition.BufferAndBacklog>, Throwable>
+                        consumeBufferFunction) {
+            this.consumeBufferFunction = consumeBufferFunction;
+            return this;
+        }
+
+        public Builder setPeekNextToConsumeDataTypeFunction(
+                Function<Integer, Buffer.DataType> peekNextToConsumeDataTypeFunction) {
+            this.peekNextToConsumeDataTypeFunction = peekNextToConsumeDataTypeFunction;
+            return this;
+        }
+
+        public Builder setGetBacklogSupplier(Supplier<Integer> getBacklogSupplier) {
+            this.getBacklogSupplier = getBacklogSupplier;
+            return this;
+        }
+
+        public Builder setReleaseDataViewRunnable(Runnable releaseDataViewRunnable) {
+            this.releaseDataViewRunnable = releaseDataViewRunnable;
+            return this;
+        }
+
+        public TestingHsDataView build() {
+            return new TestingHsDataView(
+                    consumeBufferFunction,
+                    peekNextToConsumeDataTypeFunction,
+                    getBacklogSupplier,
+                    releaseDataViewRunnable);
+        }
+    }
+}