You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/08/17 03:08:37 UTC
[flink] 01/02: [FLINK-28925][runtime] HsSubpartitionMemeoryDataManager return a readOnlySlice to downstream instead of original buffer.
This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit a9d5def847160600948349708b5dd327896fadd7
Author: Weijie Guo <re...@163.com>
AuthorDate: Thu Aug 11 10:44:04 2022 +0800
[FLINK-28925][runtime] HsSubpartitionMemeoryDataManager return a readOnlySlice to downstream instead of original buffer.
---
.../hybrid/HsSubpartitionMemoryDataManager.java | 5 ++++-
.../hybrid/HsSubpartitionMemoryDataManagerTest.java | 20 ++++++++++++++++++++
2 files changed, 24 insertions(+), 1 deletion(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java
index 7d7b529d5af..2c101881a5a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java
@@ -152,7 +152,10 @@ public class HsSubpartitionMemoryDataManager implements HsDataView {
return bufferAndNextDataType.map(
tuple ->
new BufferAndBacklog(
- tuple.f0.getBuffer(), getBacklog(), tuple.f1, toConsumeIndex));
+ tuple.f0.getBuffer().readOnlySlice(),
+ getBacklog(),
+ tuple.f1,
+ toConsumeIndex));
}
@SuppressWarnings("FieldAccessNotGuarded")
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManagerTest.java
index 3edf397cfcb..c8b435df3e5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManagerTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.Buffer.DataType;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.buffer.ReadOnlySlicedNetworkBuffer;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider.ConsumeStatus;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider.SpillStatus;
@@ -182,6 +183,25 @@ class HsSubpartitionMemoryDataManagerTest {
assertThat(subpartitionMemoryDataManager.consumeBuffer(2)).isPresent();
}
+ @Test
+ void testConsumeBufferReturnSlice() throws Exception {
+ TestingMemoryDataManagerOperation memoryDataManagerOperation =
+ TestingMemoryDataManagerOperation.builder()
+ .setRequestBufferFromPoolSupplier(() -> createBufferBuilder(RECORD_SIZE))
+ .build();
+ HsSubpartitionMemoryDataManager subpartitionMemoryDataManager =
+ createSubpartitionMemoryDataManager(memoryDataManagerOperation);
+
+ subpartitionMemoryDataManager.append(createRecord(0), DataType.DATA_BUFFER);
+
+ Optional<BufferAndBacklog> bufferOpt = subpartitionMemoryDataManager.consumeBuffer(0);
+ assertThat(bufferOpt)
+ .hasValueSatisfying(
+ (bufferAndBacklog ->
+ assertThat(bufferAndBacklog.buffer())
+ .isInstanceOf(ReadOnlySlicedNetworkBuffer.class)));
+ }
+
@Test
void testConsumeBuffer() throws Exception {
List<BufferIndexAndChannel> consumedBufferIndexAndChannel = new ArrayList<>();