You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/09/28 08:49:30 UTC
[flink] branch release-1.16 updated: [FLINK-29425] Hybrid full spilling strategy triggering spilling frequently
This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.16 by this push:
new 790434b9fb7 [FLINK-29425] Hybrid full spilling strategy triggering spilling frequently
790434b9fb7 is described below
commit 790434b9fb7296420c1ea15af0d640273776d0b0
Author: Weijie Guo <re...@163.com>
AuthorDate: Tue Sep 27 15:24:30 2022 +0800
[FLINK-29425] Hybrid full spilling strategy triggering spilling frequently
This closes #20904
---
.../partition/hybrid/HsFullSpillingStrategy.java | 32 +++++++------
.../partition/hybrid/HsMemoryDataManager.java | 39 ++++++++++++++--
.../partition/hybrid/HsResultPartition.java | 3 +-
.../hybrid/HsSelectiveSpillingStrategy.java | 2 +-
.../partition/hybrid/HsSpillingStrategy.java | 2 +-
.../hybrid/HybridShuffleConfiguration.java | 53 +++++++++++++++-------
.../hybrid/HsFullSpillingStrategyTest.java | 20 +++++---
.../partition/hybrid/HsMemoryDataManagerTest.java | 49 ++++++++++++++++++--
.../partition/hybrid/HsResultPartitionTest.java | 28 ------------
.../hybrid/HsSelectiveSpillingStrategyTest.java | 2 +-
.../partition/hybrid/HsSubpartitionViewTest.java | 3 +-
.../partition/hybrid/TestingSpillingStrategy.java | 14 +++---
12 files changed, 164 insertions(+), 83 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java
index cfc737d2efd..f0339ee152a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java
@@ -32,25 +32,25 @@ import static org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStr
/** A special implementation of {@link HsSpillingStrategy} that spilled all buffers to disk. */
public class HsFullSpillingStrategy implements HsSpillingStrategy {
- private final int numBuffersTriggerSpilling;
+ private final float numBuffersTriggerSpillingRatio;
private final float releaseBufferRatio;
private final float releaseThreshold;
public HsFullSpillingStrategy(HybridShuffleConfiguration hybridShuffleConfiguration) {
- this.numBuffersTriggerSpilling =
- hybridShuffleConfiguration.getFullStrategyNumBuffersTriggerSpilling();
+ this.numBuffersTriggerSpillingRatio =
+ hybridShuffleConfiguration.getFullStrategyNumBuffersTriggerSpillingRatio();
this.releaseThreshold = hybridShuffleConfiguration.getFullStrategyReleaseThreshold();
this.releaseBufferRatio = hybridShuffleConfiguration.getFullStrategyReleaseBufferRatio();
}
// For the case of buffer finished, whenever the number of unSpillBuffers reaches
- // numBuffersTriggerSpilling, make a decision based on global information. Otherwise, no need to
- // take action.
+ // numBuffersTriggerSpillingRatio times currentPoolSize, make a decision based on global
+ // information. Otherwise, no need to take action.
@Override
- public Optional<Decision> onBufferFinished(int numTotalUnSpillBuffers) {
- return numTotalUnSpillBuffers < numBuffersTriggerSpilling
+ public Optional<Decision> onBufferFinished(int numTotalUnSpillBuffers, int currentPoolSize) {
+ return numTotalUnSpillBuffers < numBuffersTriggerSpillingRatio * currentPoolSize
? Optional.of(Decision.NO_ACTION)
: Optional.empty();
}
@@ -74,8 +74,11 @@ public class HsFullSpillingStrategy implements HsSpillingStrategy {
@Override
public Decision decideActionWithGlobalInfo(HsSpillingInfoProvider spillingInfoProvider) {
Decision.Builder builder = Decision.builder();
- checkSpill(spillingInfoProvider, builder);
- checkRelease(spillingInfoProvider, builder);
+ // Save the cost of lock, if pool size is changed between checkSpill and checkRelease, pool
+ // size checker will handle this inconsistency.
+ int poolSize = spillingInfoProvider.getPoolSize();
+ checkSpill(spillingInfoProvider, poolSize, builder);
+ checkRelease(spillingInfoProvider, poolSize, builder);
return builder.build();
}
@@ -99,8 +102,10 @@ public class HsFullSpillingStrategy implements HsSpillingStrategy {
return builder.build();
}
- private void checkSpill(HsSpillingInfoProvider spillingInfoProvider, Decision.Builder builder) {
- if (spillingInfoProvider.getNumTotalUnSpillBuffers() < numBuffersTriggerSpilling) {
+ private void checkSpill(
+ HsSpillingInfoProvider spillingInfoProvider, int poolSize, Decision.Builder builder) {
+ if (spillingInfoProvider.getNumTotalUnSpillBuffers()
+ < numBuffersTriggerSpillingRatio * poolSize) {
// In case situation changed since onBufferFinished() returns Optional#empty()
return;
}
@@ -114,9 +119,8 @@ public class HsFullSpillingStrategy implements HsSpillingStrategy {
}
private void checkRelease(
- HsSpillingInfoProvider spillingInfoProvider, Decision.Builder builder) {
- if (spillingInfoProvider.getNumTotalRequestedBuffers()
- < spillingInfoProvider.getPoolSize() * releaseThreshold) {
+ HsSpillingInfoProvider spillingInfoProvider, int poolSize, Decision.Builder builder) {
+ if (spillingInfoProvider.getNumTotalRequestedBuffers() < poolSize * releaseThreshold) {
// In case situation changed since onMemoryUsageChanged() returns Optional#empty()
return;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
index 22a9408a01f..4f45b491aa2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategy.Decision;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.SupplierWithException;
@@ -40,6 +41,9 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@@ -71,6 +75,17 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData
private final Map<Integer, HsSubpartitionViewInternalOperations> subpartitionViewOperationsMap =
new ConcurrentHashMap<>();
+ /**
+ * Currently, it is only used to regularly check the actual size of local buffer pool (the size
+ * will change dynamically due to the redistribution of network buffers). When the size of the
+ * buffer pool changes, it attempts to trigger the spilling strategy.
+ */
+ private final ScheduledExecutorService poolSizeChecker =
+ Executors.newSingleThreadScheduledExecutor(
+ new ExecutorThreadFactory("hybrid-shuffle-pool-size-checker-executor"));
+
+ private final AtomicInteger poolSize;
+
public HsMemoryDataManager(
int numSubpartitions,
int bufferSize,
@@ -78,7 +93,8 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData
HsSpillingStrategy spillStrategy,
HsFileDataIndex fileDataIndex,
Path dataFilePath,
- BufferCompressor bufferCompressor)
+ BufferCompressor bufferCompressor,
+ long poolSizeCheckInterval)
throws IOException {
this.numSubpartitions = numSubpartitions;
this.bufferPool = bufferPool;
@@ -99,6 +115,22 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData
bufferCompressor,
this);
}
+
+ poolSize = new AtomicInteger(this.bufferPool.getNumBuffers());
+
+ if (poolSizeCheckInterval > 0) {
+ poolSizeChecker.scheduleAtFixedRate(
+ () -> {
+ int newSize = this.bufferPool.getNumBuffers();
+ int oldSize = poolSize.getAndSet(newSize);
+ if (oldSize > newSize) {
+ callWithLock(() -> spillStrategy.decideActionWithGlobalInfo(this));
+ }
+ },
+ poolSizeCheckInterval,
+ poolSizeCheckInterval,
+ TimeUnit.MILLISECONDS);
+ }
}
// ------------------------------------
@@ -144,6 +176,7 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData
Decision decision = callWithLock(() -> spillStrategy.onResultPartitionClosed(this));
handleDecision(Optional.of(decision));
spiller.close();
+ poolSizeChecker.shutdown();
}
/**
@@ -168,7 +201,7 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData
@Override
public int getPoolSize() {
- return bufferPool.getNumBuffers();
+ return poolSize.get();
}
@Override
@@ -240,7 +273,7 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData
@Override
public void onBufferFinished() {
Optional<Decision> decision =
- spillStrategy.onBufferFinished(numUnSpillBuffers.incrementAndGet());
+ spillStrategy.onBufferFinished(numUnSpillBuffers.incrementAndGet(), getPoolSize());
handleDecision(decision);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
index 5d537aa7466..9b75f1e3724 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
@@ -127,7 +127,8 @@ public class HsResultPartition extends ResultPartition {
getSpillingStrategy(hybridShuffleConfiguration),
dataIndex,
dataFilePath,
- bufferCompressor);
+ bufferCompressor,
+ hybridShuffleConfiguration.getBufferPoolSizeCheckIntervalMs());
}
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java
index dcd53393bdf..dc356cb20d5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java
@@ -45,7 +45,7 @@ public class HsSelectiveSpillingStrategy implements HsSpillingStrategy {
// For the case of buffer finished, there is no need to take action for
// HsSelectiveSpillingStrategy.
@Override
- public Optional<Decision> onBufferFinished(int numTotalUnSpillBuffers) {
+ public Optional<Decision> onBufferFinished(int numTotalUnSpillBuffers, int currentPoolSize) {
return Optional.of(Decision.NO_ACTION);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java
index fb4a5ab58d5..6d3a15d427a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java
@@ -51,7 +51,7 @@ public interface HsSpillingStrategy {
* @return A {@link Decision} based on the provided information, or {@link Optional#empty()} if
* the decision cannot be made, which indicates global information is needed.
*/
- Optional<Decision> onBufferFinished(int numTotalUnSpillBuffers);
+ Optional<Decision> onBufferFinished(int numTotalUnSpillBuffers, int currentPoolSize);
/**
* Make a decision when a buffer is consumed.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleConfiguration.java
index 18cadb71b05..f3df0b2ab5c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleConfiguration.java
@@ -30,12 +30,14 @@ public class HybridShuffleConfiguration {
private static final float DEFAULT_SELECTIVE_STRATEGY_SPILL_BUFFER_RATIO = 0.4f;
- private static final int DEFAULT_FULL_STRATEGY_NUM_BUFFERS_TRIGGER_SPILLED = 10;
+ private static final float DEFAULT_FULL_STRATEGY_NUM_BUFFERS_TRIGGER_SPILLED_RATIO = 0.5f;
private static final float DEFAULT_FULL_STRATEGY_RELEASE_THRESHOLD = 0.7f;
private static final float DEFAULT_FULL_STRATEGY_RELEASE_BUFFER_RATIO = 0.4f;
+ private static final long DEFAULT_BUFFER_POLL_SIZE_CHECK_INTERVAL_MS = 1000;
+
private static final SpillingStrategyType DEFAULT_SPILLING_STRATEGY_NAME =
SpillingStrategyType.FULL;
@@ -57,31 +59,36 @@ public class HybridShuffleConfiguration {
// ----------------------------------------
// Full Spilling Strategy
// ----------------------------------------
- private final int fullStrategyNumBuffersTriggerSpilling;
+ private final float fullStrategyNumBuffersTriggerSpillingRatio;
private final float fullStrategyReleaseThreshold;
private final float fullStrategyReleaseBufferRatio;
+ private final long bufferPoolSizeCheckIntervalMs;
+
private HybridShuffleConfiguration(
int maxBuffersReadAhead,
Duration bufferRequestTimeout,
int maxRequestedBuffers,
float selectiveStrategySpillThreshold,
float selectiveStrategySpillBufferRatio,
- int fullStrategyNumBuffersTriggerSpilling,
+ float fullStrategyNumBuffersTriggerSpillingRatio,
float fullStrategyReleaseThreshold,
float fullStrategyReleaseBufferRatio,
- SpillingStrategyType spillingStrategyType) {
+ SpillingStrategyType spillingStrategyType,
+ long bufferPoolSizeCheckIntervalMs) {
this.maxBuffersReadAhead = maxBuffersReadAhead;
this.bufferRequestTimeout = bufferRequestTimeout;
this.maxRequestedBuffers = maxRequestedBuffers;
this.selectiveStrategySpillThreshold = selectiveStrategySpillThreshold;
this.selectiveStrategySpillBufferRatio = selectiveStrategySpillBufferRatio;
- this.fullStrategyNumBuffersTriggerSpilling = fullStrategyNumBuffersTriggerSpilling;
+ this.fullStrategyNumBuffersTriggerSpillingRatio =
+ fullStrategyNumBuffersTriggerSpillingRatio;
this.fullStrategyReleaseThreshold = fullStrategyReleaseThreshold;
this.fullStrategyReleaseBufferRatio = fullStrategyReleaseBufferRatio;
this.spillingStrategyType = spillingStrategyType;
+ this.bufferPoolSizeCheckIntervalMs = bufferPoolSizeCheckIntervalMs;
}
public static Builder builder(int numSubpartitions, int numBuffersPerRequest) {
@@ -127,11 +134,11 @@ public class HybridShuffleConfiguration {
}
/**
- * When the number of unSpilled buffers equal to this value, trigger the spilling operation.
- * Used by {@link HsFullSpillingStrategy}.
+ * When the number of unSpilled buffers equal to this ratio times pool size, trigger the
+ * spilling operation. Used by {@link HsFullSpillingStrategy}.
*/
- public int getFullStrategyNumBuffersTriggerSpilling() {
- return fullStrategyNumBuffersTriggerSpilling;
+ public float getFullStrategyNumBuffersTriggerSpillingRatio() {
+ return fullStrategyNumBuffersTriggerSpillingRatio;
}
/**
@@ -147,6 +154,11 @@ public class HybridShuffleConfiguration {
return fullStrategyReleaseBufferRatio;
}
+ /** Check interval of buffer pool's size. */
+ public long getBufferPoolSizeCheckIntervalMs() {
+ return bufferPoolSizeCheckIntervalMs;
+ }
+
/** Type of {@link HsSpillingStrategy}. */
public enum SpillingStrategyType {
FULL,
@@ -164,13 +176,15 @@ public class HybridShuffleConfiguration {
private float selectiveStrategySpillBufferRatio =
DEFAULT_SELECTIVE_STRATEGY_SPILL_BUFFER_RATIO;
- private int fullStrategyNumBuffersTriggerSpilling =
- DEFAULT_FULL_STRATEGY_NUM_BUFFERS_TRIGGER_SPILLED;
+ private float fullStrategyNumBuffersTriggerSpillingRatio =
+ DEFAULT_FULL_STRATEGY_NUM_BUFFERS_TRIGGER_SPILLED_RATIO;
private float fullStrategyReleaseThreshold = DEFAULT_FULL_STRATEGY_RELEASE_THRESHOLD;
private float fullStrategyReleaseBufferRatio = DEFAULT_FULL_STRATEGY_RELEASE_BUFFER_RATIO;
+ private long bufferPoolSizeCheckIntervalMs = DEFAULT_BUFFER_POLL_SIZE_CHECK_INTERVAL_MS;
+
private SpillingStrategyType spillingStrategyType = DEFAULT_SPILLING_STRATEGY_NAME;
private final int numSubpartitions;
@@ -203,9 +217,10 @@ public class HybridShuffleConfiguration {
return this;
}
- public Builder setFullStrategyNumBuffersTriggerSpilling(
- int fullStrategyNumBuffersTriggerSpilling) {
- this.fullStrategyNumBuffersTriggerSpilling = fullStrategyNumBuffersTriggerSpilling;
+ public Builder setFullStrategyNumBuffersTriggerSpillingRatio(
+ float fullStrategyNumBuffersTriggerSpillingRatio) {
+ this.fullStrategyNumBuffersTriggerSpillingRatio =
+ fullStrategyNumBuffersTriggerSpillingRatio;
return this;
}
@@ -224,6 +239,11 @@ public class HybridShuffleConfiguration {
return this;
}
+ public Builder setBufferPoolSizeCheckIntervalMs(long bufferPoolSizeCheckIntervalMs) {
+ this.bufferPoolSizeCheckIntervalMs = bufferPoolSizeCheckIntervalMs;
+ return this;
+ }
+
public HybridShuffleConfiguration build() {
return new HybridShuffleConfiguration(
maxBuffersReadAhead,
@@ -231,10 +251,11 @@ public class HybridShuffleConfiguration {
Math.max(2 * numBuffersPerRequest, numSubpartitions),
selectiveStrategySpillThreshold,
selectiveStrategySpillBufferRatio,
- fullStrategyNumBuffersTriggerSpilling,
+ fullStrategyNumBuffersTriggerSpillingRatio,
fullStrategyReleaseThreshold,
fullStrategyReleaseBufferRatio,
- spillingStrategyType);
+ spillingStrategyType,
+ bufferPoolSizeCheckIntervalMs);
}
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java
index 44701cbab76..3602b15b2d1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java
@@ -38,7 +38,7 @@ import static org.assertj.core.api.Assertions.entry;
class HsFullSpillingStrategyTest {
public static final int NUM_SUBPARTITIONS = 2;
- public static final int NUM_BUFFERS_TRIGGER_SPILLING = 2;
+ public static final float NUM_BUFFERS_TRIGGER_SPILLING_RATIO = 0.2f;
public static final float FULL_SPILL_RELEASE_THRESHOLD = 0.8f;
@@ -47,24 +47,31 @@ class HsFullSpillingStrategyTest {
private final HsSpillingStrategy spillStrategy =
new HsFullSpillingStrategy(
HybridShuffleConfiguration.builder(NUM_SUBPARTITIONS, 1)
- .setFullStrategyNumBuffersTriggerSpilling(NUM_BUFFERS_TRIGGER_SPILLING)
+ .setFullStrategyNumBuffersTriggerSpillingRatio(
+ NUM_BUFFERS_TRIGGER_SPILLING_RATIO)
.setFullStrategyReleaseThreshold(FULL_SPILL_RELEASE_THRESHOLD)
.setFullStrategyReleaseBufferRatio(FULL_SPILL_RELEASE_RATIO)
.build());
@Test
void testOnBufferFinishedUnSpillBufferBelowThreshold() {
+ final int poolSize = 10;
Optional<Decision> finishedDecision =
- spillStrategy.onBufferFinished(NUM_BUFFERS_TRIGGER_SPILLING - 1);
+ spillStrategy.onBufferFinished(
+ (int) (poolSize * NUM_BUFFERS_TRIGGER_SPILLING_RATIO) - 1, poolSize);
assertThat(finishedDecision).hasValue(Decision.NO_ACTION);
}
@Test
void testOnBufferFinishedUnSpillBufferEqualToOrGreatThenThreshold() {
+ final int poolSize = 10;
Optional<Decision> finishedDecision =
- spillStrategy.onBufferFinished(NUM_BUFFERS_TRIGGER_SPILLING);
+ spillStrategy.onBufferFinished(
+ (int) (poolSize * NUM_BUFFERS_TRIGGER_SPILLING_RATIO), poolSize);
assertThat(finishedDecision).isNotPresent();
- finishedDecision = spillStrategy.onBufferFinished(NUM_BUFFERS_TRIGGER_SPILLING + 1);
+ finishedDecision =
+ spillStrategy.onBufferFinished(
+ (int) (poolSize * NUM_BUFFERS_TRIGGER_SPILLING_RATIO) + 1, poolSize);
assertThat(finishedDecision).isNotPresent();
}
@@ -125,7 +132,8 @@ class HsFullSpillingStrategyTest {
.addConsumedBuffers(subpartition1, Arrays.asList(0, 1))
.addSpillBuffers(subpartition2, Arrays.asList(1, 2, 3))
.addConsumedBuffers(subpartition2, Arrays.asList(0, 1))
- .setGetNumTotalUnSpillBuffersSupplier(() -> NUM_BUFFERS_TRIGGER_SPILLING)
+ .setGetNumTotalUnSpillBuffersSupplier(
+ () -> (int) (10 * NUM_BUFFERS_TRIGGER_SPILLING_RATIO))
.setGetNumTotalRequestedBuffersSupplier(() -> 10)
.setGetPoolSizeSupplier(() -> 10)
.setGetNextBufferIndexToConsumeSupplier(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerTest.java
index 4cf1bc88c94..773b847ecc1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerTest.java
@@ -65,7 +65,7 @@ class HsMemoryDataManagerTest {
HsSpillingStrategy spillingStrategy =
TestingSpillingStrategy.builder()
.setOnBufferFinishedFunction(
- (numTotalUnSpillBuffers) -> {
+ (numTotalUnSpillBuffers, currentPoolSize) -> {
finishedBuffers.incrementAndGet();
return Optional.of(Decision.NO_ACTION);
})
@@ -120,7 +120,7 @@ class HsMemoryDataManagerTest {
HsSpillingStrategy spillingStrategy =
TestingSpillingStrategy.builder()
.setOnBufferFinishedFunction(
- (numFinishedBuffers) -> {
+ (numFinishedBuffers, poolSize) -> {
if (numFinishedBuffers < numFinishedBufferToTriggerDecision) {
return Optional.of(Decision.NO_ACTION);
}
@@ -160,7 +160,7 @@ class HsMemoryDataManagerTest {
HsSpillingStrategy spillingStrategy =
TestingSpillingStrategy.builder()
.setOnBufferFinishedFunction(
- (finishedBuffer) -> {
+ (finishedBuffer, poolSize) -> {
// return empty optional to trigger global decision.
return Optional.empty();
})
@@ -192,6 +192,34 @@ class HsMemoryDataManagerTest {
assertThat(resultPartitionReleaseFuture).isCompleted();
}
+ @Test
+ void testPoolSizeCheck() throws Exception {
+ final int requiredBuffers = 10;
+ final int maxBuffers = 100;
+ CompletableFuture<Void> triggerGlobalDecision = new CompletableFuture<>();
+
+ NetworkBufferPool networkBufferPool = new NetworkBufferPool(maxBuffers, bufferSize);
+ BufferPool bufferPool = networkBufferPool.createBufferPool(requiredBuffers, maxBuffers);
+ assertThat(bufferPool.getNumBuffers()).isEqualTo(maxBuffers);
+
+ HsSpillingStrategy spillingStrategy =
+ TestingSpillingStrategy.builder()
+ .setDecideActionWithGlobalInfoFunction(
+ (spillingInfoProvider) -> {
+ assertThat(spillingInfoProvider.getPoolSize())
+ .isEqualTo(requiredBuffers);
+ triggerGlobalDecision.complete(null);
+ return Decision.NO_ACTION;
+ })
+ .build();
+
+ createMemoryDataManager(spillingStrategy, bufferPool);
+ networkBufferPool.createBufferPool(maxBuffers - requiredBuffers, maxBuffers);
+ assertThat(bufferPool.getNumBuffers()).isEqualTo(requiredBuffers);
+
+ assertThat(triggerGlobalDecision).succeedsWithin(10, TimeUnit.SECONDS);
+ }
+
private HsMemoryDataManager createMemoryDataManager(HsSpillingStrategy spillStrategy)
throws Exception {
return createMemoryDataManager(spillStrategy, new HsFileDataIndexImpl(NUM_SUBPARTITIONS));
@@ -201,6 +229,18 @@ class HsMemoryDataManagerTest {
HsSpillingStrategy spillStrategy, HsFileDataIndex fileDataIndex) throws Exception {
NetworkBufferPool networkBufferPool = new NetworkBufferPool(NUM_BUFFERS, bufferSize);
BufferPool bufferPool = networkBufferPool.createBufferPool(poolSize, poolSize);
+ return createMemoryDataManager(bufferPool, spillStrategy, fileDataIndex);
+ }
+
+ private HsMemoryDataManager createMemoryDataManager(
+ HsSpillingStrategy spillingStrategy, BufferPool bufferPool) throws Exception {
+ return createMemoryDataManager(
+ bufferPool, spillingStrategy, new HsFileDataIndexImpl(NUM_SUBPARTITIONS));
+ }
+
+ private HsMemoryDataManager createMemoryDataManager(
+ BufferPool bufferPool, HsSpillingStrategy spillStrategy, HsFileDataIndex fileDataIndex)
+ throws Exception {
HsMemoryDataManager memoryDataManager =
new HsMemoryDataManager(
NUM_SUBPARTITIONS,
@@ -209,7 +249,8 @@ class HsMemoryDataManagerTest {
spillStrategy,
fileDataIndex,
dataFilePath,
- null);
+ null,
+ 1000);
memoryDataManager.setOutputMetrics(createTestingOutputMetrics());
return memoryDataManager;
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java
index c1086b95f7d..826a7b243f2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java
@@ -41,7 +41,6 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
-import org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleConfiguration.SpillingStrategyType;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.util.IOUtils;
@@ -380,33 +379,6 @@ class HsResultPartitionTest {
return ByteBuffer.wrap(dataWritten);
}
- private HsResultPartition createHsResultPartition(
- int numSubpartitions, BufferPool bufferPool, int numBuffersTriggerSpilling)
- throws IOException {
- HsResultPartition hsResultPartition =
- new HsResultPartition(
- "HsResultPartitionTest",
- 0,
- new ResultPartitionID(),
- ResultPartitionType.HYBRID_FULL,
- numSubpartitions,
- numSubpartitions,
- readBufferPool,
- readIOExecutor,
- new ResultPartitionManager(),
- fileChannelManager.createChannel().getPath(),
- bufferSize,
- HybridShuffleConfiguration.builder(
- numSubpartitions, readBufferPool.getNumBuffersPerRequest())
- .setSpillingStrategyType(SpillingStrategyType.FULL)
- .setFullStrategyNumBuffersTriggerSpilling(numBuffersTriggerSpilling)
- .build(),
- null,
- () -> bufferPool);
- hsResultPartition.setup();
- return hsResultPartition;
- }
-
private HsResultPartition createHsResultPartition(int numSubpartitions, BufferPool bufferPool)
throws IOException {
HsResultPartition hsResultPartition =
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategyTest.java
index dfeb7a0f425..04328fbc32b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategyTest.java
@@ -49,7 +49,7 @@ class HsSelectiveSpillingStrategyTest {
@Test
void testOnBufferFinished() {
- Optional<Decision> finishedDecision = spillStrategy.onBufferFinished(5);
+ Optional<Decision> finishedDecision = spillStrategy.onBufferFinished(5, 10);
assertThat(finishedDecision).hasValue(Decision.NO_ACTION);
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java
index aedb0602c83..451ab8980da 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java
@@ -114,7 +114,8 @@ class HsSubpartitionViewTest {
spillingStrategy,
new HsFileDataIndexImpl(1),
dataFilePath.resolve(".data"),
- null);
+ null,
+ 0);
memoryDataManager.setOutputMetrics(createTestingOutputMetrics());
HsDataView hsDataView = memoryDataManager.registerSubpartitionView(0, subpartitionView);
subpartitionView.setMemoryDataView(hsDataView);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingStrategy.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingStrategy.java
index 61a730fc5c8..48fff99db7d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingStrategy.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingStrategy.java
@@ -26,7 +26,7 @@ import java.util.function.Function;
public class TestingSpillingStrategy implements HsSpillingStrategy {
private final BiFunction<Integer, Integer, Optional<Decision>> onMemoryUsageChangedFunction;
- private final Function<Integer, Optional<Decision>> onBufferFinishedFunction;
+ private final BiFunction<Integer, Integer, Optional<Decision>> onBufferFinishedFunction;
private final Function<BufferIndexAndChannel, Optional<Decision>> onBufferConsumedFunction;
@@ -36,7 +36,7 @@ public class TestingSpillingStrategy implements HsSpillingStrategy {
private TestingSpillingStrategy(
BiFunction<Integer, Integer, Optional<Decision>> onMemoryUsageChangedFunction,
- Function<Integer, Optional<Decision>> onBufferFinishedFunction,
+ BiFunction<Integer, Integer, Optional<Decision>> onBufferFinishedFunction,
Function<BufferIndexAndChannel, Optional<Decision>> onBufferConsumedFunction,
Function<HsSpillingInfoProvider, Decision> decideActionWithGlobalInfoFunction,
Function<HsSpillingInfoProvider, Decision> onResultPartitionClosedFunction) {
@@ -54,8 +54,8 @@ public class TestingSpillingStrategy implements HsSpillingStrategy {
}
@Override
- public Optional<Decision> onBufferFinished(int numTotalUnSpillBuffers) {
- return onBufferFinishedFunction.apply(numTotalUnSpillBuffers);
+ public Optional<Decision> onBufferFinished(int numTotalUnSpillBuffers, int currentPoolSize) {
+ return onBufferFinishedFunction.apply(numTotalUnSpillBuffers, currentPoolSize);
}
@Override
@@ -82,8 +82,8 @@ public class TestingSpillingStrategy implements HsSpillingStrategy {
private BiFunction<Integer, Integer, Optional<Decision>> onMemoryUsageChangedFunction =
(ignore1, ignore2) -> Optional.of(Decision.NO_ACTION);
- private Function<Integer, Optional<Decision>> onBufferFinishedFunction =
- (ignore) -> Optional.of(Decision.NO_ACTION);
+ private BiFunction<Integer, Integer, Optional<Decision>> onBufferFinishedFunction =
+ (ignore1, ignore2) -> Optional.of(Decision.NO_ACTION);
private Function<BufferIndexAndChannel, Optional<Decision>> onBufferConsumedFunction =
(ignore) -> Optional.of(Decision.NO_ACTION);
@@ -103,7 +103,7 @@ public class TestingSpillingStrategy implements HsSpillingStrategy {
}
public Builder setOnBufferFinishedFunction(
- Function<Integer, Optional<Decision>> onBufferFinishedFunction) {
+ BiFunction<Integer, Integer, Optional<Decision>> onBufferFinishedFunction) {
this.onBufferFinishedFunction = onBufferFinishedFunction;
return this;
}