You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/11/16 08:02:18 UTC
[flink] 02/03: [FLINK-29818] Calculate the release number instead of the survival number for full spilling strategy
This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 133ffab4f19b91d5a5fba58684e2a30a6b8b31c7
Author: Weijie Guo <re...@163.com>
AuthorDate: Tue Nov 1 10:42:38 2022 +0800
[FLINK-29818] Calculate the release number instead of the survival number for full spilling strategy
---
.../partition/hybrid/HsFullSpillingStrategy.java | 20 ++++++++++-------
.../hybrid/HsFullSpillingStrategyTest.java | 25 ++++------------------
.../partition/hybrid/HybridShuffleTestUtils.java | 4 ----
3 files changed, 16 insertions(+), 33 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java
index 94a067fbaeb..1d0315a0364 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java
@@ -128,20 +128,21 @@ public class HsFullSpillingStrategy implements HsSpillingStrategy {
return;
}
- int survivedNum = (int) (poolSize - poolSize * releaseBufferRatio);
+ int releaseNum = (int) (poolSize * releaseBufferRatio);
int numSubpartitions = spillingInfoProvider.getNumSubpartitions();
- int subpartitionSurvivedNum = survivedNum / numSubpartitions;
-
+ int expectedSubpartitionReleaseNum = releaseNum / numSubpartitions;
TreeMap<Integer, Deque<BufferIndexAndChannel>> bufferToRelease = new TreeMap<>();
for (int subpartitionId = 0; subpartitionId < numSubpartitions; subpartitionId++) {
Deque<BufferIndexAndChannel> buffersInOrder =
spillingInfoProvider.getBuffersInOrder(
subpartitionId, SpillStatus.SPILL, ConsumeStatusWithId.ALL_ANY);
- // if the number of subpartition buffers less than survived buffers, reserved all of
- // them.
- int releaseNum = Math.max(0, buffersInOrder.size() - subpartitionSurvivedNum);
- while (releaseNum-- != 0) {
+ // if the number of subpartition spilling buffers less than expected release number,
+ // release all of them.
+ int subpartitionReleaseNum =
+ Math.min(buffersInOrder.size(), expectedSubpartitionReleaseNum);
+ int subpartitionSurvivedNum = buffersInOrder.size() - subpartitionReleaseNum;
+ while (subpartitionSurvivedNum-- != 0) {
buffersInOrder.pollLast();
}
bufferToRelease.put(subpartitionId, buffersInOrder);
@@ -149,7 +150,10 @@ public class HsFullSpillingStrategy implements HsSpillingStrategy {
// collect results in order
for (int i = 0; i < numSubpartitions; i++) {
- builder.addBufferToRelease(i, bufferToRelease.getOrDefault(i, new ArrayDeque<>()));
+ Deque<BufferIndexAndChannel> bufferIndexAndChannels = bufferToRelease.get(i);
+ if (bufferIndexAndChannels != null && !bufferIndexAndChannels.isEmpty()) {
+ builder.addBufferToRelease(i, bufferToRelease.getOrDefault(i, new ArrayDeque<>()));
+ }
}
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java
index e88918b2c0d..ca4ea9348dd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java
@@ -102,25 +102,10 @@ class HsFullSpillingStrategyTest {
final int subpartition1 = 0;
final int subpartition2 = 1;
- final int progress1 = 10;
- final int progress2 = 20;
-
List<BufferIndexAndChannel> subpartitionBuffers1 =
- createBufferIndexAndChannelsList(
- subpartition1,
- progress1,
- progress1 + 2,
- progress1 + 4,
- progress1 + 6,
- progress1 + 8);
+ createBufferIndexAndChannelsList(subpartition1, 1, 2, 3, 4, 5);
List<BufferIndexAndChannel> subpartitionBuffers2 =
- createBufferIndexAndChannelsList(
- subpartition2,
- progress2 + 1,
- progress2 + 3,
- progress2 + 5,
- progress2 + 7,
- progress2 + 9);
+ createBufferIndexAndChannelsList(subpartition2, 1, 2, 3, 4, 5);
TestingSpillingInfoProvider spillInfoProvider =
TestingSpillingInfoProvider.builder()
@@ -133,8 +118,6 @@ class HsFullSpillingStrategyTest {
() -> (int) (10 * NUM_BUFFERS_TRIGGER_SPILLING_RATIO))
.setGetNumTotalRequestedBuffersSupplier(() -> 10)
.setGetPoolSizeSupplier(() -> 10)
- .setGetNextBufferIndexToConsumeSupplier(
- () -> Arrays.asList(progress1, progress2))
.build();
Decision decision = spillStrategy.decideActionWithGlobalInfo(spillInfoProvider);
@@ -149,9 +132,9 @@ class HsFullSpillingStrategyTest {
Map<Integer, List<BufferIndexAndChannel>> expectedReleaseBuffers = new HashMap<>();
expectedReleaseBuffers.put(
- subpartition1, new ArrayList<>(subpartitionBuffers1.subList(0, 2)));
+ subpartition1, new ArrayList<>(subpartitionBuffers1.subList(0, 3)));
expectedReleaseBuffers.put(
- subpartition2, new ArrayList<>(subpartitionBuffers2.subList(1, 3)));
+ subpartition2, new ArrayList<>(subpartitionBuffers2.subList(1, 4)));
assertThat(decision.getBufferToRelease()).isEqualTo(expectedReleaseBuffers);
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java
index f32e3179563..bd26b6d56a8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.io.network.partition.hybrid;
-import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.metrics.util.TestCounter;
import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -39,9 +38,6 @@ public class HybridShuffleTestUtils {
int subpartitionId, int... bufferIndexes) {
List<BufferIndexAndChannel> bufferIndexAndChannels = new ArrayList<>();
for (int bufferIndex : bufferIndexes) {
- MemorySegment segment =
- MemorySegmentFactory.allocateUnpooledSegment(MEMORY_SEGMENT_SIZE);
- NetworkBuffer buffer = new NetworkBuffer(segment, (ignore) -> {});
bufferIndexAndChannels.add(new BufferIndexAndChannel(bufferIndex, subpartitionId));
}
return bufferIndexAndChannels;