You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/08/02 01:57:26 UTC
[flink] 11/13: [FLINK-27908] Introduce HsSubpartitionView based on HsDataView and it's implementations.
This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit d0b133814838e3614a41e70b5e6a27b80afa0967
Author: Weijie Guo <re...@163.com>
AuthorDate: Thu Jul 28 20:14:52 2022 +0800
[FLINK-27908] Introduce HsSubpartitionView based on HsDataView and it's implementations.
---
.../io/network/partition/hybrid/HsDataView.java | 60 ++++
.../partition/hybrid/HsFileDataManager.java | 16 +-
.../partition/hybrid/HsMemoryDataManager.java | 23 +-
.../partition/hybrid/HsSubpartitionFileReader.java | 6 +-
.../hybrid/HsSubpartitionFileReaderImpl.java | 86 +++++-
.../hybrid/HsSubpartitionMemoryDataManager.java | 29 +-
.../partition/hybrid/HsSubpartitionView.java | 262 +++++++++++++++++
.../partition/hybrid/HsFileDataManagerTest.java | 27 +-
.../hybrid/HsSubpartitionFileReaderImplTest.java | 88 +++++-
.../HsSubpartitionMemoryDataManagerTest.java | 20 +-
.../partition/hybrid/HsSubpartitionViewTest.java | 313 +++++++++++++++++++++
.../partition/hybrid/TestingHsDataView.java | 127 +++++++++
12 files changed, 1011 insertions(+), 46 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsDataView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsDataView.java
new file mode 100644
index 00000000000..c3d4cb9ed83
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsDataView.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer.DataType;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
+
+import java.util.Optional;
+
+/**
+ * A view for {@link HsSubpartitionView} to find out what data exists in memory or disk and polling
+ * the data.
+ */
+public interface HsDataView {
+
+ /**
+ * Try to consume next buffer.
+ *
+ * <p>Only invoked by consumer thread.
+ *
+ * @param nextBufferToConsume next buffer index to consume.
+ * @return If the target buffer does exist, return buffer and next buffer's backlog, otherwise
+ * return {@link Optional#empty()}.
+ */
+ Optional<BufferAndBacklog> consumeBuffer(int nextBufferToConsume) throws Throwable;
+
+ /**
+ * Get dataType of next buffer to consume.
+ *
+ * @param nextBufferToConsume next buffer index to consume
+ * @return next buffer's dataType. If not found in memory, return {@link DataType#NONE}.
+ */
+ DataType peekNextToConsumeDataType(int nextBufferToConsume);
+
+ /**
+ * Get the number of buffers backlog.
+ *
+ * @return backlog of this view's corresponding subpartition.
+ */
+ int getBacklog();
+
+ /** Release this {@link HsDataView} when related subpartition view is releasing. */
+ void releaseDataView();
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
index f37a1ff5164..6070a838110 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
@@ -147,7 +147,7 @@ public class HsFileDataManager implements Runnable, BufferRecycler {
}
/** This method only called by result partition to create subpartitionFileReader. */
- public HsSubpartitionFileReader registerNewSubpartition(
+ public HsDataView registerNewSubpartition(
int subpartitionId, HsSubpartitionViewInternalOperations operation) throws IOException {
synchronized (lock) {
checkState(!isReleased, "HsFileDataManager is already released.");
@@ -159,7 +159,8 @@ public class HsFileDataManager implements Runnable, BufferRecycler {
dataFileChannel,
operation,
dataIndex,
- hybridShuffleConfiguration.getMaxBuffersReadAhead());
+ hybridShuffleConfiguration.getMaxBuffersReadAhead(),
+ this::releaseSubpartitionReader);
allReaders.add(subpartitionReader);
@@ -172,6 +173,17 @@ public class HsFileDataManager implements Runnable, BufferRecycler {
IOUtils.deleteFileQuietly(dataFilePath);
}
+ /**
+ * Release specific {@link HsSubpartitionFileReader} from {@link HsFileDataManager}.
+ *
+ * @param subpartitionFileReader to release.
+ */
+ public void releaseSubpartitionReader(HsSubpartitionFileReader subpartitionFileReader) {
+ synchronized (lock) {
+ removeSubpartitionReaders(Collections.singleton(subpartitionFileReader));
+ }
+ }
+
/** Releases this file data manager and delete shuffle data after all readers is removed. */
public void release() {
synchronized (lock) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
index 9bf57b51c4b..11228fe7206 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
@@ -121,7 +121,7 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData
* #subpartitionViewOperationsMap}. It is used to obtain the consumption progress of the
* subpartition.
*/
- public void registerSubpartitionView(
+ public HsDataView registerSubpartitionView(
int subpartitionId, HsSubpartitionViewInternalOperations viewOperations) {
HsSubpartitionViewInternalOperations oldView =
subpartitionViewOperationsMap.put(subpartitionId, viewOperations);
@@ -130,6 +130,7 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData
"subpartition : {} register subpartition view will replace old view. ",
subpartitionId);
}
+ return getSubpartitionMemoryDataManager(subpartitionId);
}
/** Close this {@link HsMemoryDataManager}, it means no data can append to memory. */
@@ -318,24 +319,4 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData
lock.unlock();
}
}
-
- /** Integrate the buffer and dataType of next buffer. */
- public static class BufferAndNextDataType {
- private final Buffer buffer;
-
- private final Buffer.DataType nextDataType;
-
- public BufferAndNextDataType(Buffer buffer, Buffer.DataType nextDataType) {
- this.buffer = buffer;
- this.nextDataType = nextDataType;
- }
-
- public Buffer getBuffer() {
- return buffer;
- }
-
- public Buffer.DataType getNextDataType() {
- return nextDataType;
- }
- }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReader.java
index e582be2a620..fbc044d6da0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReader.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.util.Queue;
+import java.util.function.Consumer;
/**
* This component is responsible for reading data from disk for a specific subpartition.
@@ -31,7 +32,7 @@ import java.util.Queue;
* <p>In order to access the disk as sequentially as possible {@link HsSubpartitionFileReader} need
* to be able to compare priorities.
*/
-public interface HsSubpartitionFileReader extends Comparable<HsSubpartitionFileReader> {
+public interface HsSubpartitionFileReader extends Comparable<HsSubpartitionFileReader>, HsDataView {
/** Do prep work before this {@link HsSubpartitionFileReader} is scheduled to read data. */
void prepareForScheduling();
@@ -58,6 +59,7 @@ public interface HsSubpartitionFileReader extends Comparable<HsSubpartitionFileR
FileChannel dataFileChannel,
HsSubpartitionViewInternalOperations operation,
HsFileDataIndex dataIndex,
- int maxBuffersReadAhead);
+ int maxBuffersReadAhead,
+ Consumer<HsSubpartitionFileReader> fileReaderReleaser);
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
index ecf02260dd1..a355765ac35 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
@@ -23,6 +23,8 @@ import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
+import org.apache.flink.util.ExceptionUtils;
import javax.annotation.Nullable;
@@ -34,6 +36,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingDeque;
+import java.util.function.Consumer;
import static org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil.positionToNextBuffer;
import static org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil.readFromByteChannel;
@@ -62,6 +65,8 @@ public class HsSubpartitionFileReaderImpl implements HsSubpartitionFileReader {
private final Deque<BufferIndexOrError> loadedBuffers = new LinkedBlockingDeque<>();
+ private final Consumer<HsSubpartitionFileReader> fileReaderReleaser;
+
private volatile boolean isFailed;
public HsSubpartitionFileReaderImpl(
@@ -69,12 +74,14 @@ public class HsSubpartitionFileReaderImpl implements HsSubpartitionFileReader {
FileChannel dataFileChannel,
HsSubpartitionViewInternalOperations operations,
HsFileDataIndex dataIndex,
- int maxBufferReadAhead) {
+ int maxBufferReadAhead,
+ Consumer<HsSubpartitionFileReader> fileReaderReleaser) {
this.subpartitionId = subpartitionId;
this.dataFileChannel = dataFileChannel;
this.operations = operations;
this.bufferIndexManager = new BufferIndexManager(maxBufferReadAhead);
this.cachedRegionManager = new CachedRegionManager(subpartitionId, dataIndex);
+ this.fileReaderReleaser = fileReaderReleaser;
}
@Override
@@ -192,10 +199,77 @@ public class HsSubpartitionFileReaderImpl implements HsSubpartitionFileReader {
return loadedBuffers;
}
+ @Override
+ public Optional<ResultSubpartition.BufferAndBacklog> consumeBuffer(int nextBufferToConsume)
+ throws Throwable {
+ if (!checkAndGetFirstBufferIndexOrError(nextBufferToConsume).isPresent()) {
+ return Optional.empty();
+ }
+
+ // already ensure that peek element is not null and not throwable.
+ BufferIndexOrError current = checkNotNull(loadedBuffers.poll());
+
+ BufferIndexOrError next = loadedBuffers.peek();
+
+ Buffer.DataType nextDataType = next == null ? Buffer.DataType.NONE : next.getDataType();
+ int backlog = loadedBuffers.size();
+ int bufferIndex = current.getIndex();
+ Buffer buffer =
+ current.getBuffer()
+ .orElseThrow(
+ () ->
+ new NullPointerException(
+ "Get a non-throwable and non-buffer bufferIndexOrError, which is not allowed"));
+ return Optional.of(
+ ResultSubpartition.BufferAndBacklog.fromBufferAndLookahead(
+ buffer, nextDataType, backlog, bufferIndex));
+ }
+
+ @Override
+ public Buffer.DataType peekNextToConsumeDataType(int nextBufferToConsume) {
+ Buffer.DataType dataType = Buffer.DataType.NONE;
+ try {
+ dataType =
+ checkAndGetFirstBufferIndexOrError(nextBufferToConsume)
+ .map(BufferIndexOrError::getDataType)
+ .orElse(Buffer.DataType.NONE);
+ } catch (Throwable throwable) {
+ ExceptionUtils.rethrow(throwable);
+ }
+ return dataType;
+ }
+
+ @Override
+ public void releaseDataView() {
+ fileReaderReleaser.accept(this);
+ }
+
+ @Override
+ public int getBacklog() {
+ return loadedBuffers.size();
+ }
+
// ------------------------------------------------------------------------
// Internal Methods
// ------------------------------------------------------------------------
+ private Optional<BufferIndexOrError> checkAndGetFirstBufferIndexOrError(int expectedBufferIndex)
+ throws Throwable {
+ if (loadedBuffers.isEmpty()) {
+ return Optional.empty();
+ }
+
+ BufferIndexOrError peek = loadedBuffers.peek();
+
+ if (peek.getThrowable().isPresent()) {
+ throw peek.getThrowable().get();
+ } else if (peek.getIndex() != expectedBufferIndex) {
+ return Optional.empty();
+ }
+
+ return Optional.of(peek);
+ }
+
private void moveFileOffsetToBuffer(int bufferIndex) throws IOException {
Tuple2<Integer, Long> indexAndOffset =
cachedRegionManager.getNumSkipAndFileOffset(bufferIndex);
@@ -403,9 +477,15 @@ public class HsSubpartitionFileReaderImpl implements HsSubpartitionFileReader {
FileChannel dataFileChannel,
HsSubpartitionViewInternalOperations operation,
HsFileDataIndex dataIndex,
- int maxBuffersReadAhead) {
+ int maxBuffersReadAhead,
+ Consumer<HsSubpartitionFileReader> fileReaderReleaser) {
return new HsSubpartitionFileReaderImpl(
- subpartitionId, dataFileChannel, operation, dataIndex, maxBuffersReadAhead);
+ subpartitionId,
+ dataFileChannel,
+ operation,
+ dataIndex,
+ maxBuffersReadAhead,
+ fileReaderReleaser);
}
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java
index b5079e7df21..17b7e833e29 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider.ConsumeStatus;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider.SpillStatus;
import org.apache.flink.util.function.SupplierWithException;
@@ -54,7 +55,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* This class is responsible for managing the data in a single subpartition. One {@link
* HsMemoryDataManager} will hold multiple {@link HsSubpartitionMemoryDataManager}.
*/
-public class HsSubpartitionMemoryDataManager {
+public class HsSubpartitionMemoryDataManager implements HsDataView {
private final int targetChannel;
private final int bufferSize;
@@ -108,22 +109,24 @@ public class HsSubpartitionMemoryDataManager {
@SuppressWarnings("FieldAccessNotGuarded")
// Note that: callWithLock ensure that code block guarded by resultPartitionReadLock and
// subpartitionLock.
+ @Override
public DataType peekNextToConsumeDataType(int nextToConsumeIndex) {
return callWithLock(() -> peekNextToConsumeDataTypeInternal(nextToConsumeIndex));
}
/**
* Check whether the head of {@link #unConsumedBuffers} is the buffer to be consumed. If so,
- * return the buffer and next data type.
+ * return the buffer and backlog.
*
* @param toConsumeIndex index of buffer to be consumed.
* @return If the head of {@link #unConsumedBuffers} is target, return optional of the buffer
- * and next data type. Otherwise, return {@link Optional#empty()}.
+ * and backlog. Otherwise, return {@link Optional#empty()}.
*/
@SuppressWarnings("FieldAccessNotGuarded")
// Note that: callWithLock ensure that code block guarded by resultPartitionReadLock and
// subpartitionLock.
- public Optional<HsMemoryDataManager.BufferAndNextDataType> consumeBuffer(int toConsumeIndex) {
+ @Override
+ public Optional<BufferAndBacklog> consumeBuffer(int toConsumeIndex) {
Optional<Tuple2<HsBufferContext, DataType>> bufferAndNextDataType =
callWithLock(
() -> {
@@ -145,8 +148,22 @@ public class HsSubpartitionMemoryDataManager {
tuple.f0.getBufferIndexAndChannel()));
return bufferAndNextDataType.map(
tuple ->
- new HsMemoryDataManager.BufferAndNextDataType(
- tuple.f0.getBuffer(), tuple.f1));
+ new BufferAndBacklog(
+ tuple.f0.getBuffer(), getBacklog(), tuple.f1, toConsumeIndex));
+ }
+
+ @SuppressWarnings("FieldAccessNotGuarded")
+ // Un-synchronized get unConsumedBuffers size to provide memory data backlog,this will make the
+ // result greater than or equal to the actual backlog, but obtaining an accurate backlog will
+ // bring too much extra overhead.
+ @Override
+ public int getBacklog() {
+ return unConsumedBuffers.size();
+ }
+
+ @Override
+ public void releaseDataView() {
+ // nothing to do for memory data.
}
// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java
new file mode 100644
index 00000000000..5aa7e8bac0b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** The read view of HsResultPartition, data can be read from memory or disk. */
+public class HsSubpartitionView
+ implements ResultSubpartitionView, HsSubpartitionViewInternalOperations {
+ private final BufferAvailabilityListener availabilityListener;
+ private final Object lock = new Object();
+
+ /** Index of last consumed buffer. */
+ @GuardedBy("lock")
+ private int lastConsumedBufferIndex = -1;
+
+ @GuardedBy("lock")
+ private boolean needNotify = false;
+
+ @Nullable
+ @GuardedBy("lock")
+ private Buffer.DataType cachedNextDataType = null;
+
+ @Nullable
+ @GuardedBy("lock")
+ private Throwable failureCause = null;
+
+ @GuardedBy("lock")
+ private boolean isReleased = false;
+
+ @Nullable
+ @GuardedBy("lock")
+ // diskDataView can be null only before initialization.
+ private HsDataView diskDataView;
+
+ @Nullable
+ @GuardedBy("lock")
+ // memoryDataView can be null only before initialization.
+ private HsDataView memoryDataView;
+
+ public HsSubpartitionView(BufferAvailabilityListener availabilityListener) {
+ this.availabilityListener = availabilityListener;
+ }
+
+ @Nullable
+ @Override
+ public BufferAndBacklog getNextBuffer() {
+ synchronized (lock) {
+ try {
+ checkNotNull(diskDataView, "disk data view must be not null.");
+ checkNotNull(memoryDataView, "memory data view must be not null.");
+
+ Optional<BufferAndBacklog> bufferToConsume = tryReadFromDisk();
+ if (!bufferToConsume.isPresent()) {
+ bufferToConsume = memoryDataView.consumeBuffer(lastConsumedBufferIndex + 1);
+ }
+ updateConsumingStatus(bufferToConsume);
+ return bufferToConsume.orElse(null);
+ } catch (Throwable cause) {
+ releaseInternal(cause);
+ return null;
+ }
+ }
+ }
+
+ @Override
+ public void notifyDataAvailable() {
+ boolean notifyDownStream = false;
+ synchronized (lock) {
+ if (isReleased) {
+ return;
+ }
+ if (needNotify) {
+ notifyDownStream = true;
+ needNotify = false;
+ }
+ }
+ // notify outside of lock to avoid deadlock
+ if (notifyDownStream) {
+ availabilityListener.notifyDataAvailable();
+ }
+ }
+
+ @Override
+ public AvailabilityWithBacklog getAvailabilityAndBacklog(int numCreditsAvailable) {
+ synchronized (lock) {
+ boolean availability = numCreditsAvailable > 0;
+ if (numCreditsAvailable <= 0
+ && cachedNextDataType != null
+ && cachedNextDataType == Buffer.DataType.EVENT_BUFFER) {
+ availability = true;
+ }
+ return new AvailabilityWithBacklog(availability, diskDataView.getBacklog());
+ }
+ }
+
+ @Override
+ public void releaseAllResources() throws IOException {
+ releaseInternal(null);
+ }
+
+ @Override
+ public boolean isReleased() {
+ synchronized (lock) {
+ return isReleased;
+ }
+ }
+
+ @Override
+ public int getConsumingOffset() {
+ synchronized (lock) {
+ return lastConsumedBufferIndex;
+ }
+ }
+
+ @Override
+ public Throwable getFailureCause() {
+ synchronized (lock) {
+ return failureCause;
+ }
+ }
+
+ /**
+ * Set {@link HsDataView} for this subpartition, this method only called when {@link
+ * HsSubpartitionFileReader} is creating.
+ */
+ void setDiskDataView(HsDataView diskDataView) {
+ synchronized (lock) {
+ checkState(this.diskDataView == null, "repeatedly set disk data view is not allowed.");
+ this.diskDataView = diskDataView;
+ }
+ }
+
+ /**
+ * Set {@link HsDataView} for this subpartition, this method only called when {@link
+ * HsSubpartitionFileReader} is creating.
+ */
+ void setMemoryDataView(HsDataView memoryDataView) {
+ synchronized (lock) {
+ checkState(
+ this.memoryDataView == null, "repeatedly set memory data view is not allowed.");
+ this.memoryDataView = memoryDataView;
+ }
+ }
+
+ @Override
+ public void resumeConsumption() {
+ throw new UnsupportedOperationException("resumeConsumption should never be called.");
+ }
+
+ @Override
+ public void acknowledgeAllDataProcessed() {
+ // in case of bounded partitions there is no upstream to acknowledge, we simply ignore
+ // the ack, as there are no checkpoints
+ }
+
+ @SuppressWarnings("FieldAccessNotGuarded")
+ @Override
+ public int unsynchronizedGetNumberOfQueuedBuffers() {
+ return diskDataView.getBacklog();
+ }
+
+ @SuppressWarnings("FieldAccessNotGuarded")
+ @Override
+ public int getNumberOfQueuedBuffers() {
+ return diskDataView.getBacklog();
+ }
+
+ @Override
+ public void notifyNewBufferSize(int newBufferSize) {
+ throw new UnsupportedOperationException("Method should never be called.");
+ }
+
+ // -------------------------------
+ // Internal Methods
+ // -------------------------------
+
+ @GuardedBy("lock")
+ private Optional<BufferAndBacklog> tryReadFromDisk() throws Throwable {
+ final int nextBufferIndexToConsume = lastConsumedBufferIndex + 1;
+ return checkNotNull(diskDataView)
+ .consumeBuffer(nextBufferIndexToConsume)
+ .map(
+ bufferAndBacklog -> {
+ if (bufferAndBacklog.getNextDataType() == Buffer.DataType.NONE) {
+ return new BufferAndBacklog(
+ bufferAndBacklog.buffer(),
+ bufferAndBacklog.buffersInBacklog(),
+ checkNotNull(memoryDataView)
+ .peekNextToConsumeDataType(
+ nextBufferIndexToConsume + 1),
+ bufferAndBacklog.getSequenceNumber());
+ }
+ return bufferAndBacklog;
+ });
+ }
+
+ @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
+ @GuardedBy("lock")
+ private void updateConsumingStatus(Optional<BufferAndBacklog> bufferAndBacklog) {
+ assert Thread.holdsLock(lock);
+ // if consumed, update and check consume offset
+ if (bufferAndBacklog.isPresent()) {
+ ++lastConsumedBufferIndex;
+ checkState(bufferAndBacklog.get().getSequenceNumber() == lastConsumedBufferIndex);
+ }
+
+ // update need-notify
+ boolean dataAvailable =
+ bufferAndBacklog.map(BufferAndBacklog::isDataAvailable).orElse(false);
+ needNotify = !dataAvailable;
+ // update cached next data type
+ cachedNextDataType = bufferAndBacklog.map(BufferAndBacklog::getNextDataType).orElse(null);
+ }
+
+ private void releaseInternal(@Nullable Throwable throwable) {
+ boolean releaseSubpartitionReader = false;
+ synchronized (lock) {
+ if (isReleased) {
+ return;
+ }
+ isReleased = true;
+ failureCause = throwable;
+ if (diskDataView != null) {
+ releaseSubpartitionReader = true;
+ }
+ }
+ // release subpartition reader outside of lock to avoid deadlock.
+ if (releaseSubpartitionReader) {
+ //noinspection FieldAccessNotGuarded
+ diskDataView.releaseDataView();
+ }
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java
index 1a39e14ca02..1a2e28959a6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java
@@ -21,7 +21,9 @@ package org.apache.flink.runtime.io.network.partition.hybrid;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.util.TestLoggerExtension;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.util.function.BiConsumerWithException;
@@ -40,6 +42,7 @@ import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.time.Duration;
import java.util.ArrayDeque;
+import java.util.Optional;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
@@ -396,6 +399,27 @@ class HsFileDataManagerTest {
this.failConsumer = failConsumer;
}
+ @Override
+ public Optional<ResultSubpartition.BufferAndBacklog> consumeBuffer(
+ int nextBufferToConsume) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Buffer.DataType peekNextToConsumeDataType(int nextBufferToConsume) {
+ return Buffer.DataType.NONE;
+ }
+
+ @Override
+ public int getBacklog() {
+ return 0;
+ }
+
+ @Override
+ public void releaseDataView() {
+ // do nothing.
+ }
+
/** Factory for {@link TestingHsSubpartitionFileReader}. */
private static class Factory implements HsSubpartitionFileReader.Factory {
private final Queue<HsSubpartitionFileReader> allReaders = new ArrayDeque<>();
@@ -406,7 +430,8 @@ class HsFileDataManagerTest {
FileChannel dataFileChannel,
HsSubpartitionViewInternalOperations operation,
HsFileDataIndex dataIndex,
- int maxBuffersReadAhead) {
+ int maxBuffersReadAhead,
+ Consumer<HsSubpartitionFileReader> fileReaderReleaser) {
return checkNotNull(allReaders.poll());
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java
index 36a0833163d..d25a5a34fb3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.Buffer.DataType;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
@@ -360,6 +361,86 @@ class HsSubpartitionFileReaderImplTest {
assertThat(fileReader1).isGreaterThan(fileReader2);
}
+ @Test
+ void testConsumeBuffer() throws Throwable {
+ TestingSubpartitionViewInternalOperation viewNotifier =
+ new TestingSubpartitionViewInternalOperation();
+ HsSubpartitionFileReaderImpl subpartitionFileReader =
+ createSubpartitionFileReader(0, viewNotifier);
+
+ // if no preload data in file reader, return Optional.empty.
+ assertThat(subpartitionFileReader.consumeBuffer(0)).isNotPresent();
+
+ // buffers in file: (0-0, 0-1)
+ writeDataToFile(0, 0, 0, 2);
+
+ Queue<MemorySegment> memorySegments = createsMemorySegments(2);
+ // trigger reading, add buffer to queue.
+ subpartitionFileReader.readBuffers(memorySegments, (ignore) -> {});
+
+ // if nextBufferToConsume is not equal to peek elements index, return Optional.empty.
+ assertThat(subpartitionFileReader.consumeBuffer(10)).isNotPresent();
+
+ assertThat(subpartitionFileReader.consumeBuffer(0))
+ .hasValueSatisfying(
+ (bufferAndBacklog -> {
+ assertThat(bufferAndBacklog.getNextDataType())
+ .isEqualTo(DataType.EVENT_BUFFER);
+ assertThat(bufferAndBacklog.getSequenceNumber()).isEqualTo(0);
+ // first buffer's data is 0.
+ assertThat(
+ bufferAndBacklog
+ .buffer()
+ .getNioBufferReadable()
+ .order(ByteOrder.nativeOrder())
+ .getInt())
+ .isEqualTo(0);
+ }));
+ }
+
+ @Test
+ void testPeekNextToConsumeDataTypeOrConsumeBufferThrowException() {
+ TestingSubpartitionViewInternalOperation viewNotifier =
+ new TestingSubpartitionViewInternalOperation();
+ HsSubpartitionFileReaderImpl subpartitionFileReader =
+ createSubpartitionFileReader(0, viewNotifier);
+
+ subpartitionFileReader.fail(new RuntimeException("expected exception."));
+
+ assertThatThrownBy(() -> subpartitionFileReader.peekNextToConsumeDataType(0))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessageContaining("expected exception.");
+
+ assertThatThrownBy(() -> subpartitionFileReader.consumeBuffer(0))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessageContaining("expected exception.");
+ }
+
+ @Test
+ void testPeekNextToConsumeDataType() throws Throwable {
+ TestingSubpartitionViewInternalOperation viewNotifier =
+ new TestingSubpartitionViewInternalOperation();
+ HsSubpartitionFileReaderImpl subpartitionFileReader =
+ createSubpartitionFileReader(0, viewNotifier);
+
+ // if no preload data in file reader, return DataType.NONE.
+ assertThat(subpartitionFileReader.peekNextToConsumeDataType(0)).isEqualTo(DataType.NONE);
+
+ // buffers in file: (0-0, 0-1)
+ writeDataToFile(0, 0, 2);
+
+ Queue<MemorySegment> memorySegments = createsMemorySegments(2);
+ // trigger reading, add buffer to queue.
+ subpartitionFileReader.readBuffers(memorySegments, (ignore) -> {});
+
+ // if nextBufferToConsume is not equal to peek elements index, return DataType.NONE.
+ assertThat(subpartitionFileReader.peekNextToConsumeDataType(10)).isEqualTo(DataType.NONE);
+
+ // if nextBufferToConsume is equal to peek elements index, return the real DataType.
+ assertThat(subpartitionFileReader.peekNextToConsumeDataType(0))
+ .isEqualTo(DataType.DATA_BUFFER);
+ }
+
private static void checkData(HsSubpartitionFileReaderImpl fileReader, int... expectedData) {
assertThat(fileReader.getLoadedBuffers()).hasSameSizeAs(expectedData);
for (int data : expectedData) {
@@ -383,7 +464,12 @@ class HsSubpartitionFileReaderImplTest {
private HsSubpartitionFileReaderImpl createSubpartitionFileReader(
int targetChannel, HsSubpartitionViewInternalOperations operations) {
return new HsSubpartitionFileReaderImpl(
- targetChannel, dataFileChannel, operations, diskIndex, MAX_BUFFERS_READ_AHEAD);
+ targetChannel,
+ dataFileChannel,
+ operations,
+ diskIndex,
+ MAX_BUFFERS_READ_AHEAD,
+ (ignore) -> {});
}
private static FileChannel openFileChannel(Path path) throws IOException {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManagerTest.java
index a15dfddc9f2..f1b16d0d231 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManagerTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.Buffer.DataType;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
-import org.apache.flink.runtime.io.network.partition.hybrid.HsMemoryDataManager.BufferAndNextDataType;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider.ConsumeStatus;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider.SpillStatus;
@@ -372,14 +372,14 @@ class HsSubpartitionMemoryDataManagerTest {
private static void checkConsumedBufferAndNextDataType(
List<Tuple2<Integer, Buffer.DataType>> expectedRecords,
- List<Optional<BufferAndNextDataType>> bufferAndNextDataTypesOpt) {
- checkArgument(expectedRecords.size() == bufferAndNextDataTypesOpt.size());
- for (int i = 0; i < bufferAndNextDataTypesOpt.size(); i++) {
+ List<Optional<BufferAndBacklog>> bufferAndBacklogOpt) {
+ checkArgument(expectedRecords.size() == bufferAndBacklogOpt.size());
+ for (int i = 0; i < bufferAndBacklogOpt.size(); i++) {
final int index = i;
- assertThat(bufferAndNextDataTypesOpt.get(index))
+ assertThat(bufferAndBacklogOpt.get(index))
.hasValueSatisfying(
- (bufferAndNextDataType -> {
- Buffer buffer = bufferAndNextDataType.getBuffer();
+ (bufferAndBacklog -> {
+ Buffer buffer = bufferAndBacklog.buffer();
int value =
buffer.getNioBufferReadable()
.order(ByteOrder.LITTLE_ENDIAN)
@@ -387,11 +387,11 @@ class HsSubpartitionMemoryDataManagerTest {
Buffer.DataType dataType = buffer.getDataType();
assertThat(value).isEqualTo(expectedRecords.get(index).f0);
assertThat(dataType).isEqualTo(expectedRecords.get(index).f1);
- if (index != bufferAndNextDataTypesOpt.size() - 1) {
- assertThat(bufferAndNextDataType.getNextDataType())
+ if (index != bufferAndBacklogOpt.size() - 1) {
+ assertThat(bufferAndBacklog.getNextDataType())
.isEqualTo(expectedRecords.get(index + 1).f1);
} else {
- assertThat(bufferAndNextDataType.getNextDataType())
+ assertThat(bufferAndBacklog.getNextDataType())
.isEqualTo(Buffer.DataType.NONE);
}
}));
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java
new file mode 100644
index 00000000000..77a797a80f1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.Buffer.DataType;
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView.AvailabilityWithBacklog;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link HsSubpartitionView}. */
+class HsSubpartitionViewTest {
+ @Test
+ void testGetNextBufferFromDisk() {
+ HsSubpartitionView subpartitionView = createSubpartitionView();
+
+ BufferAndBacklog bufferAndBacklog = createBufferAndBacklog(0, DataType.DATA_BUFFER, 0);
+ CompletableFuture<Void> consumeBufferFromMemoryFuture = new CompletableFuture<>();
+ TestingHsDataView diskDataView =
+ TestingHsDataView.builder()
+ .setConsumeBufferFunction(
+ (bufferToConsume) -> Optional.of(bufferAndBacklog))
+ .build();
+ TestingHsDataView memoryDataView =
+ TestingHsDataView.builder()
+ .setConsumeBufferFunction(
+ (ignore) -> {
+ consumeBufferFromMemoryFuture.complete(null);
+ return Optional.empty();
+ })
+ .build();
+ subpartitionView.setDiskDataView(diskDataView);
+ subpartitionView.setMemoryDataView(memoryDataView);
+
+ BufferAndBacklog nextBuffer = subpartitionView.getNextBuffer();
+ assertThat(consumeBufferFromMemoryFuture).isNotCompleted();
+ assertThat(nextBuffer).isSameAs(bufferAndBacklog);
+ }
+
+ @Test
+ void testGetNextBufferFromDiskNextDataTypeIsNone() {
+ HsSubpartitionView subpartitionView = createSubpartitionView();
+ BufferAndBacklog bufferAndBacklog = createBufferAndBacklog(0, DataType.NONE, 0);
+
+ TestingHsDataView diskDataView =
+ TestingHsDataView.builder()
+ .setConsumeBufferFunction(
+ (bufferToConsume) -> Optional.of(bufferAndBacklog))
+ .build();
+
+ TestingHsDataView memoryDataView =
+ TestingHsDataView.builder()
+ .setPeekNextToConsumeDataTypeFunction(
+ (bufferToConsume) -> {
+ assertThat(bufferToConsume).isEqualTo(1);
+ return DataType.EVENT_BUFFER;
+ })
+ .build();
+ subpartitionView.setDiskDataView(diskDataView);
+ subpartitionView.setMemoryDataView(memoryDataView);
+
+ BufferAndBacklog nextBuffer = subpartitionView.getNextBuffer();
+ assertThat(nextBuffer).isNotNull();
+ assertThat(nextBuffer.buffer()).isSameAs(bufferAndBacklog.buffer());
+ assertThat(nextBuffer.buffersInBacklog()).isEqualTo(bufferAndBacklog.buffersInBacklog());
+ assertThat(nextBuffer.getSequenceNumber()).isEqualTo(bufferAndBacklog.getSequenceNumber());
+ assertThat(nextBuffer.getNextDataType()).isEqualTo(DataType.EVENT_BUFFER);
+ }
+
+ @Test
+ void testGetNextBufferFromMemory() {
+ HsSubpartitionView subpartitionView = createSubpartitionView();
+
+ BufferAndBacklog bufferAndBacklog = createBufferAndBacklog(0, DataType.DATA_BUFFER, 0);
+ TestingHsDataView memoryDataView =
+ TestingHsDataView.builder()
+ .setConsumeBufferFunction(
+ (bufferToConsume) -> Optional.of(bufferAndBacklog))
+ .build();
+ TestingHsDataView diskDataView =
+ TestingHsDataView.builder()
+ .setConsumeBufferFunction((bufferToConsume) -> Optional.empty())
+ .build();
+ subpartitionView.setDiskDataView(diskDataView);
+ subpartitionView.setMemoryDataView(memoryDataView);
+
+ BufferAndBacklog nextBuffer = subpartitionView.getNextBuffer();
+ assertThat(nextBuffer).isSameAs(bufferAndBacklog);
+ }
+
+ @Test
+ void testGetNextBufferThrowException() {
+ HsSubpartitionView subpartitionView = createSubpartitionView();
+
+ TestingHsDataView diskDataView =
+ TestingHsDataView.builder()
+ .setConsumeBufferFunction(
+ (nextToConsume) -> {
+ throw new RuntimeException("expected exception.");
+ })
+ .build();
+ subpartitionView.setDiskDataView(diskDataView);
+ subpartitionView.setMemoryDataView(TestingHsDataView.NO_OP);
+
+ subpartitionView.getNextBuffer();
+ assertThat(subpartitionView.getFailureCause())
+ .isInstanceOf(RuntimeException.class)
+ .hasMessageContaining("expected exception.");
+ assertThat(subpartitionView.isReleased()).isTrue();
+ }
+
+ @Test
+ void testNotifyDataAvailableNeedNotify() {
+ CompletableFuture<Void> notifyAvailableFuture = new CompletableFuture<>();
+ HsSubpartitionView subpartitionView =
+ createSubpartitionView(() -> notifyAvailableFuture.complete(null));
+
+ TestingHsDataView memoryDataView =
+ TestingHsDataView.builder()
+ .setConsumeBufferFunction(
+ (bufferToConsume) ->
+ Optional.of(createBufferAndBacklog(0, DataType.NONE, 0)))
+ .build();
+ subpartitionView.setMemoryDataView(memoryDataView);
+ subpartitionView.setDiskDataView(TestingHsDataView.NO_OP);
+
+ subpartitionView.getNextBuffer();
+ subpartitionView.notifyDataAvailable();
+ assertThat(notifyAvailableFuture).isCompleted();
+ }
+
+ @Test
+ void testNotifyDataAvailableNotNeedNotify() {
+ CompletableFuture<Void> notifyAvailableFuture = new CompletableFuture<>();
+ HsSubpartitionView subpartitionView =
+ createSubpartitionView(() -> notifyAvailableFuture.complete(null));
+
+ TestingHsDataView memoryDataView =
+ TestingHsDataView.builder()
+ .setConsumeBufferFunction(
+ (bufferToConsume) ->
+ Optional.of(
+ createBufferAndBacklog(0, DataType.DATA_BUFFER, 0)))
+ .build();
+ subpartitionView.setMemoryDataView(memoryDataView);
+ subpartitionView.setDiskDataView(TestingHsDataView.NO_OP);
+
+ subpartitionView.getNextBuffer();
+ subpartitionView.notifyDataAvailable();
+ assertThat(notifyAvailableFuture).isNotCompleted();
+ }
+
+ @Test
+ void testGetAvailabilityAndBacklogPositiveCredit() {
+ HsSubpartitionView subpartitionView = createSubpartitionView();
+ subpartitionView.setMemoryDataView(TestingHsDataView.NO_OP);
+
+ final int backlog = 2;
+ subpartitionView.setDiskDataView(
+ TestingHsDataView.builder().setGetBacklogSupplier(() -> backlog).build());
+ AvailabilityWithBacklog availabilityAndBacklog =
+ subpartitionView.getAvailabilityAndBacklog(1);
+ assertThat(availabilityAndBacklog.getBacklog()).isEqualTo(backlog);
+ // positive credit always available.
+ assertThat(availabilityAndBacklog.isAvailable()).isTrue();
+ }
+
+ @Test
+ void testGetAvailabilityAndBacklogNonPositiveCreditNextIsData() {
+ final int backlog = 2;
+
+ HsSubpartitionView subpartitionView = createSubpartitionView();
+ subpartitionView.setMemoryDataView(
+ TestingHsDataView.builder()
+ .setConsumeBufferFunction(
+ (nextToConsume) ->
+ Optional.of(
+ createBufferAndBacklog(
+ backlog, DataType.DATA_BUFFER, 0)))
+ .build());
+ subpartitionView.setDiskDataView(
+ TestingHsDataView.builder().setGetBacklogSupplier(() -> backlog).build());
+
+ subpartitionView.getNextBuffer();
+
+ AvailabilityWithBacklog availabilityAndBacklog =
+ subpartitionView.getAvailabilityAndBacklog(0);
+ assertThat(availabilityAndBacklog.getBacklog()).isEqualTo(backlog);
+ // if credit is non-positive, only event can be available.
+ assertThat(availabilityAndBacklog.isAvailable()).isFalse();
+ }
+
+ @Test
+ void testGetAvailabilityAndBacklogNonPositiveCreditNextIsEvent() {
+ final int backlog = 2;
+
+ HsSubpartitionView subpartitionView = createSubpartitionView();
+ subpartitionView.setMemoryDataView(
+ TestingHsDataView.builder()
+ .setConsumeBufferFunction(
+ (nextToConsume) ->
+ Optional.of(
+ createBufferAndBacklog(
+ backlog, DataType.EVENT_BUFFER, 0)))
+ .build());
+ subpartitionView.setDiskDataView(
+ TestingHsDataView.builder().setGetBacklogSupplier(() -> backlog).build());
+
+ subpartitionView.getNextBuffer();
+
+ AvailabilityWithBacklog availabilityAndBacklog =
+ subpartitionView.getAvailabilityAndBacklog(0);
+ assertThat(availabilityAndBacklog.getBacklog()).isEqualTo(backlog);
+ // if credit is non-positive, only event can be available.
+ assertThat(availabilityAndBacklog.isAvailable()).isTrue();
+ }
+
+ @Test
+ void testRelease() throws Exception {
+ HsSubpartitionView subpartitionView = createSubpartitionView();
+ CompletableFuture<Void> releaseDataViewFuture = new CompletableFuture<>();
+ TestingHsDataView diskDataView =
+ TestingHsDataView.builder()
+ .setReleaseDataViewRunnable(() -> releaseDataViewFuture.complete(null))
+ .build();
+ subpartitionView.setDiskDataView(diskDataView);
+ subpartitionView.setMemoryDataView(TestingHsDataView.NO_OP);
+ subpartitionView.releaseAllResources();
+ assertThat(subpartitionView.isReleased()).isTrue();
+ assertThat(releaseDataViewFuture).isCompleted();
+ }
+
+ @Test
+ void testGetConsumingOffset() {
+ AtomicInteger nextBufferIndex = new AtomicInteger(0);
+ HsSubpartitionView subpartitionView = createSubpartitionView();
+ TestingHsDataView diskDataView =
+ TestingHsDataView.builder()
+ .setConsumeBufferFunction(
+ (toConsumeBuffer) ->
+ Optional.of(
+ createBufferAndBacklog(
+ 0,
+ DataType.DATA_BUFFER,
+ nextBufferIndex.getAndIncrement())))
+ .build();
+ subpartitionView.setDiskDataView(diskDataView);
+ subpartitionView.setMemoryDataView(TestingHsDataView.NO_OP);
+
+ assertThat(subpartitionView.getConsumingOffset()).isEqualTo(-1);
+ subpartitionView.getNextBuffer();
+ assertThat(subpartitionView.getConsumingOffset()).isEqualTo(0);
+ subpartitionView.getNextBuffer();
+ assertThat(subpartitionView.getConsumingOffset()).isEqualTo(1);
+ }
+
+ @Test
+ void testSetDataViewRepeatedly() {
+ HsSubpartitionView subpartitionView = createSubpartitionView();
+
+ subpartitionView.setMemoryDataView(TestingHsDataView.NO_OP);
+ assertThatThrownBy(() -> subpartitionView.setMemoryDataView(TestingHsDataView.NO_OP))
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("repeatedly set memory data view is not allowed.");
+
+ subpartitionView.setDiskDataView(TestingHsDataView.NO_OP);
+ assertThatThrownBy(() -> subpartitionView.setDiskDataView(TestingHsDataView.NO_OP))
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("repeatedly set disk data view is not allowed.");
+ }
+
+ private static HsSubpartitionView createSubpartitionView() {
+ return new HsSubpartitionView(new NoOpBufferAvailablityListener());
+ }
+
+ private static HsSubpartitionView createSubpartitionView(
+ BufferAvailabilityListener bufferAvailabilityListener) {
+ return new HsSubpartitionView(bufferAvailabilityListener);
+ }
+
+ private static BufferAndBacklog createBufferAndBacklog(
+ int buffersInBacklog, DataType nextDataType, int sequenceNumber) {
+ final int bufferSize = 8;
+ Buffer buffer = HybridShuffleTestUtils.createBuffer(bufferSize, true);
+ return new BufferAndBacklog(buffer, buffersInBacklog, nextDataType, sequenceNumber);
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingHsDataView.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingHsDataView.java
new file mode 100644
index 00000000000..343bd9586df
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingHsDataView.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
+import org.apache.flink.util.function.FunctionWithException;
+
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/** Mock {@link HsDataView} for testing. */
+public class TestingHsDataView implements HsDataView {
+ public static final TestingHsDataView NO_OP = TestingHsDataView.builder().build();
+
+ private final FunctionWithException<
+ Integer, Optional<ResultSubpartition.BufferAndBacklog>, Throwable>
+ consumeBufferFunction;
+
+ private final Function<Integer, Buffer.DataType> peekNextToConsumeDataTypeFunction;
+
+ private final Supplier<Integer> getBacklogSupplier;
+
+ private final Runnable releaseDataViewRunnable;
+
+ private TestingHsDataView(
+ FunctionWithException<Integer, Optional<ResultSubpartition.BufferAndBacklog>, Throwable>
+ consumeBufferFunction,
+ Function<Integer, Buffer.DataType> peekNextToConsumeDataTypeFunction,
+ Supplier<Integer> getBacklogSupplier,
+ Runnable releaseDataViewRunnable) {
+ this.consumeBufferFunction = consumeBufferFunction;
+ this.peekNextToConsumeDataTypeFunction = peekNextToConsumeDataTypeFunction;
+ this.getBacklogSupplier = getBacklogSupplier;
+ this.releaseDataViewRunnable = releaseDataViewRunnable;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ @Override
+ public Optional<ResultSubpartition.BufferAndBacklog> consumeBuffer(int nextBufferToConsume)
+ throws Throwable {
+ return consumeBufferFunction.apply(nextBufferToConsume);
+ }
+
+ @Override
+ public Buffer.DataType peekNextToConsumeDataType(int nextBufferToConsume) {
+ return peekNextToConsumeDataTypeFunction.apply(nextBufferToConsume);
+ }
+
+ @Override
+ public int getBacklog() {
+ return getBacklogSupplier.get();
+ }
+
+ @Override
+ public void releaseDataView() {
+ releaseDataViewRunnable.run();
+ }
+
+ /** Builder for {@link TestingHsDataView}. */
+ public static class Builder {
+ private FunctionWithException<
+ Integer, Optional<ResultSubpartition.BufferAndBacklog>, Throwable>
+ consumeBufferFunction = (ignore) -> Optional.empty();
+
+ private Function<Integer, Buffer.DataType> peekNextToConsumeDataTypeFunction =
+ (ignore) -> Buffer.DataType.NONE;
+
+ private Supplier<Integer> getBacklogSupplier = () -> 0;
+
+ private Runnable releaseDataViewRunnable = () -> {};
+
+ private Builder() {}
+
+ public Builder setConsumeBufferFunction(
+ FunctionWithException<
+ Integer, Optional<ResultSubpartition.BufferAndBacklog>, Throwable>
+ consumeBufferFunction) {
+ this.consumeBufferFunction = consumeBufferFunction;
+ return this;
+ }
+
+ public Builder setPeekNextToConsumeDataTypeFunction(
+ Function<Integer, Buffer.DataType> peekNextToConsumeDataTypeFunction) {
+ this.peekNextToConsumeDataTypeFunction = peekNextToConsumeDataTypeFunction;
+ return this;
+ }
+
+ public Builder setGetBacklogSupplier(Supplier<Integer> getBacklogSupplier) {
+ this.getBacklogSupplier = getBacklogSupplier;
+ return this;
+ }
+
+ public Builder setReleaseDataViewRunnable(Runnable releaseDataViewRunnable) {
+ this.releaseDataViewRunnable = releaseDataViewRunnable;
+ return this;
+ }
+
+ public TestingHsDataView build() {
+ return new TestingHsDataView(
+ consumeBufferFunction,
+ peekNextToConsumeDataTypeFunction,
+ getBacklogSupplier,
+ releaseDataViewRunnable);
+ }
+ }
+}