You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/08/17 03:08:37 UTC

[flink] 01/02: [FLINK-28925][runtime] HsSubpartitionMemeoryDataManager return a readOnlySlice to downstream instead of original buffer.

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a9d5def847160600948349708b5dd327896fadd7
Author: Weijie Guo <re...@163.com>
AuthorDate: Thu Aug 11 10:44:04 2022 +0800

    [FLINK-28925][runtime] HsSubpartitionMemeoryDataManager return a readOnlySlice to downstream instead of original buffer.
---
 .../hybrid/HsSubpartitionMemoryDataManager.java      |  5 ++++-
 .../hybrid/HsSubpartitionMemoryDataManagerTest.java  | 20 ++++++++++++++++++++
 2 files changed, 24 insertions(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java
index 7d7b529d5af..2c101881a5a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java
@@ -152,7 +152,10 @@ public class HsSubpartitionMemoryDataManager implements HsDataView {
         return bufferAndNextDataType.map(
                 tuple ->
                         new BufferAndBacklog(
-                                tuple.f0.getBuffer(), getBacklog(), tuple.f1, toConsumeIndex));
+                                tuple.f0.getBuffer().readOnlySlice(),
+                                getBacklog(),
+                                tuple.f1,
+                                toConsumeIndex));
     }
 
     @SuppressWarnings("FieldAccessNotGuarded")
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManagerTest.java
index 3edf397cfcb..c8b435df3e5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManagerTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.Buffer.DataType;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.buffer.ReadOnlySlicedNetworkBuffer;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
 import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider.ConsumeStatus;
 import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider.SpillStatus;
@@ -182,6 +183,25 @@ class HsSubpartitionMemoryDataManagerTest {
         assertThat(subpartitionMemoryDataManager.consumeBuffer(2)).isPresent();
     }
 
+    @Test
+    void testConsumeBufferReturnSlice() throws Exception {
+        TestingMemoryDataManagerOperation memoryDataManagerOperation =
+                TestingMemoryDataManagerOperation.builder()
+                        .setRequestBufferFromPoolSupplier(() -> createBufferBuilder(RECORD_SIZE))
+                        .build();
+        HsSubpartitionMemoryDataManager subpartitionMemoryDataManager =
+                createSubpartitionMemoryDataManager(memoryDataManagerOperation);
+
+        subpartitionMemoryDataManager.append(createRecord(0), DataType.DATA_BUFFER);
+
+        Optional<BufferAndBacklog> bufferOpt = subpartitionMemoryDataManager.consumeBuffer(0);
+        assertThat(bufferOpt)
+                .hasValueSatisfying(
+                        (bufferAndBacklog ->
+                                assertThat(bufferAndBacklog.buffer())
+                                        .isInstanceOf(ReadOnlySlicedNetworkBuffer.class)));
+    }
+
     @Test
     void testConsumeBuffer() throws Exception {
         List<BufferIndexAndChannel> consumedBufferIndexAndChannel = new ArrayList<>();