You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/11/16 08:02:18 UTC

[flink] 02/03: [FLINK-29818] Calculate the release number instead of the survival number for full spilling strategy

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 133ffab4f19b91d5a5fba58684e2a30a6b8b31c7
Author: Weijie Guo <re...@163.com>
AuthorDate: Tue Nov 1 10:42:38 2022 +0800

    [FLINK-29818] Calculate the release number instead of the survival number for full spilling strategy
---
 .../partition/hybrid/HsFullSpillingStrategy.java   | 20 ++++++++++-------
 .../hybrid/HsFullSpillingStrategyTest.java         | 25 ++++------------------
 .../partition/hybrid/HybridShuffleTestUtils.java   |  4 ----
 3 files changed, 16 insertions(+), 33 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java
index 94a067fbaeb..1d0315a0364 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java
@@ -128,20 +128,21 @@ public class HsFullSpillingStrategy implements HsSpillingStrategy {
             return;
         }
 
-        int survivedNum = (int) (poolSize - poolSize * releaseBufferRatio);
+        int releaseNum = (int) (poolSize * releaseBufferRatio);
         int numSubpartitions = spillingInfoProvider.getNumSubpartitions();
-        int subpartitionSurvivedNum = survivedNum / numSubpartitions;
-
+        int expectedSubpartitionReleaseNum = releaseNum / numSubpartitions;
         TreeMap<Integer, Deque<BufferIndexAndChannel>> bufferToRelease = new TreeMap<>();
 
         for (int subpartitionId = 0; subpartitionId < numSubpartitions; subpartitionId++) {
             Deque<BufferIndexAndChannel> buffersInOrder =
                     spillingInfoProvider.getBuffersInOrder(
                             subpartitionId, SpillStatus.SPILL, ConsumeStatusWithId.ALL_ANY);
-            // if the number of subpartition buffers less than survived buffers, reserved all of
-            // them.
-            int releaseNum = Math.max(0, buffersInOrder.size() - subpartitionSurvivedNum);
-            while (releaseNum-- != 0) {
+            // if the number of subpartition spilling buffers less than expected release number,
+            // release all of them.
+            int subpartitionReleaseNum =
+                    Math.min(buffersInOrder.size(), expectedSubpartitionReleaseNum);
+            int subpartitionSurvivedNum = buffersInOrder.size() - subpartitionReleaseNum;
+            while (subpartitionSurvivedNum-- != 0) {
                 buffersInOrder.pollLast();
             }
             bufferToRelease.put(subpartitionId, buffersInOrder);
@@ -149,7 +150,10 @@ public class HsFullSpillingStrategy implements HsSpillingStrategy {
 
         // collect results in order
         for (int i = 0; i < numSubpartitions; i++) {
-            builder.addBufferToRelease(i, bufferToRelease.getOrDefault(i, new ArrayDeque<>()));
+            Deque<BufferIndexAndChannel> bufferIndexAndChannels = bufferToRelease.get(i);
+            if (bufferIndexAndChannels != null && !bufferIndexAndChannels.isEmpty()) {
+                builder.addBufferToRelease(i, bufferToRelease.getOrDefault(i, new ArrayDeque<>()));
+            }
         }
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java
index e88918b2c0d..ca4ea9348dd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java
@@ -102,25 +102,10 @@ class HsFullSpillingStrategyTest {
         final int subpartition1 = 0;
         final int subpartition2 = 1;
 
-        final int progress1 = 10;
-        final int progress2 = 20;
-
         List<BufferIndexAndChannel> subpartitionBuffers1 =
-                createBufferIndexAndChannelsList(
-                        subpartition1,
-                        progress1,
-                        progress1 + 2,
-                        progress1 + 4,
-                        progress1 + 6,
-                        progress1 + 8);
+                createBufferIndexAndChannelsList(subpartition1, 1, 2, 3, 4, 5);
         List<BufferIndexAndChannel> subpartitionBuffers2 =
-                createBufferIndexAndChannelsList(
-                        subpartition2,
-                        progress2 + 1,
-                        progress2 + 3,
-                        progress2 + 5,
-                        progress2 + 7,
-                        progress2 + 9);
+                createBufferIndexAndChannelsList(subpartition2, 1, 2, 3, 4, 5);
 
         TestingSpillingInfoProvider spillInfoProvider =
                 TestingSpillingInfoProvider.builder()
@@ -133,8 +118,6 @@ class HsFullSpillingStrategyTest {
                                 () -> (int) (10 * NUM_BUFFERS_TRIGGER_SPILLING_RATIO))
                         .setGetNumTotalRequestedBuffersSupplier(() -> 10)
                         .setGetPoolSizeSupplier(() -> 10)
-                        .setGetNextBufferIndexToConsumeSupplier(
-                                () -> Arrays.asList(progress1, progress2))
                         .build();
 
         Decision decision = spillStrategy.decideActionWithGlobalInfo(spillInfoProvider);
@@ -149,9 +132,9 @@ class HsFullSpillingStrategyTest {
 
         Map<Integer, List<BufferIndexAndChannel>> expectedReleaseBuffers = new HashMap<>();
         expectedReleaseBuffers.put(
-                subpartition1, new ArrayList<>(subpartitionBuffers1.subList(0, 2)));
+                subpartition1, new ArrayList<>(subpartitionBuffers1.subList(0, 3)));
         expectedReleaseBuffers.put(
-                subpartition2, new ArrayList<>(subpartitionBuffers2.subList(1, 3)));
+                subpartition2, new ArrayList<>(subpartitionBuffers2.subList(1, 4)));
         assertThat(decision.getBufferToRelease()).isEqualTo(expectedReleaseBuffers);
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java
index f32e3179563..bd26b6d56a8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.io.network.partition.hybrid;
 
-import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.metrics.util.TestCounter;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -39,9 +38,6 @@ public class HybridShuffleTestUtils {
             int subpartitionId, int... bufferIndexes) {
         List<BufferIndexAndChannel> bufferIndexAndChannels = new ArrayList<>();
         for (int bufferIndex : bufferIndexes) {
-            MemorySegment segment =
-                    MemorySegmentFactory.allocateUnpooledSegment(MEMORY_SEGMENT_SIZE);
-            NetworkBuffer buffer = new NetworkBuffer(segment, (ignore) -> {});
             bufferIndexAndChannels.add(new BufferIndexAndChannel(bufferIndex, subpartitionId));
         }
         return bufferIndexAndChannels;