You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/09/28 08:49:30 UTC

[flink] branch release-1.16 updated: [FLINK-29425] Hybrid full spilling strategy triggering spilling frequently

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.16 by this push:
     new 790434b9fb7 [FLINK-29425] Hybrid full spilling strategy triggering spilling frequently
790434b9fb7 is described below

commit 790434b9fb7296420c1ea15af0d640273776d0b0
Author: Weijie Guo <re...@163.com>
AuthorDate: Tue Sep 27 15:24:30 2022 +0800

    [FLINK-29425] Hybrid full spilling strategy triggering spilling frequently
    
    This closes #20904
---
 .../partition/hybrid/HsFullSpillingStrategy.java   | 32 +++++++------
 .../partition/hybrid/HsMemoryDataManager.java      | 39 ++++++++++++++--
 .../partition/hybrid/HsResultPartition.java        |  3 +-
 .../hybrid/HsSelectiveSpillingStrategy.java        |  2 +-
 .../partition/hybrid/HsSpillingStrategy.java       |  2 +-
 .../hybrid/HybridShuffleConfiguration.java         | 53 +++++++++++++++-------
 .../hybrid/HsFullSpillingStrategyTest.java         | 20 +++++---
 .../partition/hybrid/HsMemoryDataManagerTest.java  | 49 ++++++++++++++++++--
 .../partition/hybrid/HsResultPartitionTest.java    | 28 ------------
 .../hybrid/HsSelectiveSpillingStrategyTest.java    |  2 +-
 .../partition/hybrid/HsSubpartitionViewTest.java   |  3 +-
 .../partition/hybrid/TestingSpillingStrategy.java  | 14 +++---
 12 files changed, 164 insertions(+), 83 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java
index cfc737d2efd..f0339ee152a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java
@@ -32,25 +32,25 @@ import static org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStr
 
 /** A special implementation of {@link HsSpillingStrategy} that spilled all buffers to disk. */
 public class HsFullSpillingStrategy implements HsSpillingStrategy {
-    private final int numBuffersTriggerSpilling;
+    private final float numBuffersTriggerSpillingRatio;
 
     private final float releaseBufferRatio;
 
     private final float releaseThreshold;
 
     public HsFullSpillingStrategy(HybridShuffleConfiguration hybridShuffleConfiguration) {
-        this.numBuffersTriggerSpilling =
-                hybridShuffleConfiguration.getFullStrategyNumBuffersTriggerSpilling();
+        this.numBuffersTriggerSpillingRatio =
+                hybridShuffleConfiguration.getFullStrategyNumBuffersTriggerSpillingRatio();
         this.releaseThreshold = hybridShuffleConfiguration.getFullStrategyReleaseThreshold();
         this.releaseBufferRatio = hybridShuffleConfiguration.getFullStrategyReleaseBufferRatio();
     }
 
     // For the case of buffer finished, whenever the number of unSpillBuffers reaches
-    // numBuffersTriggerSpilling, make a decision based on global information. Otherwise, no need to
-    // take action.
+    // numBuffersTriggerSpillingRatio times currentPoolSize, make a decision based on global
+    // information. Otherwise, no need to take action.
     @Override
-    public Optional<Decision> onBufferFinished(int numTotalUnSpillBuffers) {
-        return numTotalUnSpillBuffers < numBuffersTriggerSpilling
+    public Optional<Decision> onBufferFinished(int numTotalUnSpillBuffers, int currentPoolSize) {
+        return numTotalUnSpillBuffers < numBuffersTriggerSpillingRatio * currentPoolSize
                 ? Optional.of(Decision.NO_ACTION)
                 : Optional.empty();
     }
@@ -74,8 +74,11 @@ public class HsFullSpillingStrategy implements HsSpillingStrategy {
     @Override
     public Decision decideActionWithGlobalInfo(HsSpillingInfoProvider spillingInfoProvider) {
         Decision.Builder builder = Decision.builder();
-        checkSpill(spillingInfoProvider, builder);
-        checkRelease(spillingInfoProvider, builder);
+        // Save the cost of lock, if pool size is changed between checkSpill and checkRelease, pool
+        // size checker will handle this inconsistency.
+        int poolSize = spillingInfoProvider.getPoolSize();
+        checkSpill(spillingInfoProvider, poolSize, builder);
+        checkRelease(spillingInfoProvider, poolSize, builder);
         return builder.build();
     }
 
@@ -99,8 +102,10 @@ public class HsFullSpillingStrategy implements HsSpillingStrategy {
         return builder.build();
     }
 
-    private void checkSpill(HsSpillingInfoProvider spillingInfoProvider, Decision.Builder builder) {
-        if (spillingInfoProvider.getNumTotalUnSpillBuffers() < numBuffersTriggerSpilling) {
+    private void checkSpill(
+            HsSpillingInfoProvider spillingInfoProvider, int poolSize, Decision.Builder builder) {
+        if (spillingInfoProvider.getNumTotalUnSpillBuffers()
+                < numBuffersTriggerSpillingRatio * poolSize) {
             // In case situation changed since onBufferFinished() returns Optional#empty()
             return;
         }
@@ -114,9 +119,8 @@ public class HsFullSpillingStrategy implements HsSpillingStrategy {
     }
 
     private void checkRelease(
-            HsSpillingInfoProvider spillingInfoProvider, Decision.Builder builder) {
-        if (spillingInfoProvider.getNumTotalRequestedBuffers()
-                < spillingInfoProvider.getPoolSize() * releaseThreshold) {
+            HsSpillingInfoProvider spillingInfoProvider, int poolSize, Decision.Builder builder) {
+        if (spillingInfoProvider.getNumTotalRequestedBuffers() < poolSize * releaseThreshold) {
             // In case situation changed since onMemoryUsageChanged() returns Optional#empty()
             return;
         }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
index 22a9408a01f..4f45b491aa2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategy.Decision;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.function.SupplierWithException;
 
@@ -40,6 +41,9 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -71,6 +75,17 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData
     private final Map<Integer, HsSubpartitionViewInternalOperations> subpartitionViewOperationsMap =
             new ConcurrentHashMap<>();
 
+    /**
+     * Currently, it is only used to regularly check the actual size of local buffer pool (the size
+     * will change dynamically due to the redistribution of network buffers). When the size of the
+     * buffer pool changes, it attempts to trigger the spilling strategy.
+     */
+    private final ScheduledExecutorService poolSizeChecker =
+            Executors.newSingleThreadScheduledExecutor(
+                    new ExecutorThreadFactory("hybrid-shuffle-pool-size-checker-executor"));
+
+    private final AtomicInteger poolSize;
+
     public HsMemoryDataManager(
             int numSubpartitions,
             int bufferSize,
@@ -78,7 +93,8 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData
             HsSpillingStrategy spillStrategy,
             HsFileDataIndex fileDataIndex,
             Path dataFilePath,
-            BufferCompressor bufferCompressor)
+            BufferCompressor bufferCompressor,
+            long poolSizeCheckInterval)
             throws IOException {
         this.numSubpartitions = numSubpartitions;
         this.bufferPool = bufferPool;
@@ -99,6 +115,22 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData
                             bufferCompressor,
                             this);
         }
+
+        poolSize = new AtomicInteger(this.bufferPool.getNumBuffers());
+
+        if (poolSizeCheckInterval > 0) {
+            poolSizeChecker.scheduleAtFixedRate(
+                    () -> {
+                        int newSize = this.bufferPool.getNumBuffers();
+                        int oldSize = poolSize.getAndSet(newSize);
+                        if (oldSize > newSize) {
+                            callWithLock(() -> spillStrategy.decideActionWithGlobalInfo(this));
+                        }
+                    },
+                    poolSizeCheckInterval,
+                    poolSizeCheckInterval,
+                    TimeUnit.MILLISECONDS);
+        }
     }
 
     // ------------------------------------
@@ -144,6 +176,7 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData
         Decision decision = callWithLock(() -> spillStrategy.onResultPartitionClosed(this));
         handleDecision(Optional.of(decision));
         spiller.close();
+        poolSizeChecker.shutdown();
     }
 
     /**
@@ -168,7 +201,7 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData
 
     @Override
     public int getPoolSize() {
-        return bufferPool.getNumBuffers();
+        return poolSize.get();
     }
 
     @Override
@@ -240,7 +273,7 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData
     @Override
     public void onBufferFinished() {
         Optional<Decision> decision =
-                spillStrategy.onBufferFinished(numUnSpillBuffers.incrementAndGet());
+                spillStrategy.onBufferFinished(numUnSpillBuffers.incrementAndGet(), getPoolSize());
         handleDecision(decision);
     }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
index 5d537aa7466..9b75f1e3724 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
@@ -127,7 +127,8 @@ public class HsResultPartition extends ResultPartition {
                         getSpillingStrategy(hybridShuffleConfiguration),
                         dataIndex,
                         dataFilePath,
-                        bufferCompressor);
+                        bufferCompressor,
+                        hybridShuffleConfiguration.getBufferPoolSizeCheckIntervalMs());
     }
 
     @Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java
index dcd53393bdf..dc356cb20d5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java
@@ -45,7 +45,7 @@ public class HsSelectiveSpillingStrategy implements HsSpillingStrategy {
     // For the case of buffer finished, there is no need to take action for
     // HsSelectiveSpillingStrategy.
     @Override
-    public Optional<Decision> onBufferFinished(int numTotalUnSpillBuffers) {
+    public Optional<Decision> onBufferFinished(int numTotalUnSpillBuffers, int currentPoolSize) {
         return Optional.of(Decision.NO_ACTION);
     }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java
index fb4a5ab58d5..6d3a15d427a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java
@@ -51,7 +51,7 @@ public interface HsSpillingStrategy {
      * @return A {@link Decision} based on the provided information, or {@link Optional#empty()} if
      *     the decision cannot be made, which indicates global information is needed.
      */
-    Optional<Decision> onBufferFinished(int numTotalUnSpillBuffers);
+    Optional<Decision> onBufferFinished(int numTotalUnSpillBuffers, int currentPoolSize);
 
     /**
      * Make a decision when a buffer is consumed.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleConfiguration.java
index 18cadb71b05..f3df0b2ab5c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleConfiguration.java
@@ -30,12 +30,14 @@ public class HybridShuffleConfiguration {
 
     private static final float DEFAULT_SELECTIVE_STRATEGY_SPILL_BUFFER_RATIO = 0.4f;
 
-    private static final int DEFAULT_FULL_STRATEGY_NUM_BUFFERS_TRIGGER_SPILLED = 10;
+    private static final float DEFAULT_FULL_STRATEGY_NUM_BUFFERS_TRIGGER_SPILLED_RATIO = 0.5f;
 
     private static final float DEFAULT_FULL_STRATEGY_RELEASE_THRESHOLD = 0.7f;
 
     private static final float DEFAULT_FULL_STRATEGY_RELEASE_BUFFER_RATIO = 0.4f;
 
+    private static final long DEFAULT_BUFFER_POLL_SIZE_CHECK_INTERVAL_MS = 1000;
+
     private static final SpillingStrategyType DEFAULT_SPILLING_STRATEGY_NAME =
             SpillingStrategyType.FULL;
 
@@ -57,31 +59,36 @@ public class HybridShuffleConfiguration {
     // ----------------------------------------
     //        Full Spilling Strategy
     // ----------------------------------------
-    private final int fullStrategyNumBuffersTriggerSpilling;
+    private final float fullStrategyNumBuffersTriggerSpillingRatio;
 
     private final float fullStrategyReleaseThreshold;
 
     private final float fullStrategyReleaseBufferRatio;
 
+    private final long bufferPoolSizeCheckIntervalMs;
+
     private HybridShuffleConfiguration(
             int maxBuffersReadAhead,
             Duration bufferRequestTimeout,
             int maxRequestedBuffers,
             float selectiveStrategySpillThreshold,
             float selectiveStrategySpillBufferRatio,
-            int fullStrategyNumBuffersTriggerSpilling,
+            float fullStrategyNumBuffersTriggerSpillingRatio,
             float fullStrategyReleaseThreshold,
             float fullStrategyReleaseBufferRatio,
-            SpillingStrategyType spillingStrategyType) {
+            SpillingStrategyType spillingStrategyType,
+            long bufferPoolSizeCheckIntervalMs) {
         this.maxBuffersReadAhead = maxBuffersReadAhead;
         this.bufferRequestTimeout = bufferRequestTimeout;
         this.maxRequestedBuffers = maxRequestedBuffers;
         this.selectiveStrategySpillThreshold = selectiveStrategySpillThreshold;
         this.selectiveStrategySpillBufferRatio = selectiveStrategySpillBufferRatio;
-        this.fullStrategyNumBuffersTriggerSpilling = fullStrategyNumBuffersTriggerSpilling;
+        this.fullStrategyNumBuffersTriggerSpillingRatio =
+                fullStrategyNumBuffersTriggerSpillingRatio;
         this.fullStrategyReleaseThreshold = fullStrategyReleaseThreshold;
         this.fullStrategyReleaseBufferRatio = fullStrategyReleaseBufferRatio;
         this.spillingStrategyType = spillingStrategyType;
+        this.bufferPoolSizeCheckIntervalMs = bufferPoolSizeCheckIntervalMs;
     }
 
     public static Builder builder(int numSubpartitions, int numBuffersPerRequest) {
@@ -127,11 +134,11 @@ public class HybridShuffleConfiguration {
     }
 
     /**
-     * When the number of unSpilled buffers equal to this value, trigger the spilling operation.
-     * Used by {@link HsFullSpillingStrategy}.
+     * When the number of unSpilled buffers equal to this ratio times pool size, trigger the
+     * spilling operation. Used by {@link HsFullSpillingStrategy}.
      */
-    public int getFullStrategyNumBuffersTriggerSpilling() {
-        return fullStrategyNumBuffersTriggerSpilling;
+    public float getFullStrategyNumBuffersTriggerSpillingRatio() {
+        return fullStrategyNumBuffersTriggerSpillingRatio;
     }
 
     /**
@@ -147,6 +154,11 @@ public class HybridShuffleConfiguration {
         return fullStrategyReleaseBufferRatio;
     }
 
+    /** Check interval of buffer pool's size. */
+    public long getBufferPoolSizeCheckIntervalMs() {
+        return bufferPoolSizeCheckIntervalMs;
+    }
+
     /** Type of {@link HsSpillingStrategy}. */
     public enum SpillingStrategyType {
         FULL,
@@ -164,13 +176,15 @@ public class HybridShuffleConfiguration {
         private float selectiveStrategySpillBufferRatio =
                 DEFAULT_SELECTIVE_STRATEGY_SPILL_BUFFER_RATIO;
 
-        private int fullStrategyNumBuffersTriggerSpilling =
-                DEFAULT_FULL_STRATEGY_NUM_BUFFERS_TRIGGER_SPILLED;
+        private float fullStrategyNumBuffersTriggerSpillingRatio =
+                DEFAULT_FULL_STRATEGY_NUM_BUFFERS_TRIGGER_SPILLED_RATIO;
 
         private float fullStrategyReleaseThreshold = DEFAULT_FULL_STRATEGY_RELEASE_THRESHOLD;
 
         private float fullStrategyReleaseBufferRatio = DEFAULT_FULL_STRATEGY_RELEASE_BUFFER_RATIO;
 
+        private long bufferPoolSizeCheckIntervalMs = DEFAULT_BUFFER_POLL_SIZE_CHECK_INTERVAL_MS;
+
         private SpillingStrategyType spillingStrategyType = DEFAULT_SPILLING_STRATEGY_NAME;
 
         private final int numSubpartitions;
@@ -203,9 +217,10 @@ public class HybridShuffleConfiguration {
             return this;
         }
 
-        public Builder setFullStrategyNumBuffersTriggerSpilling(
-                int fullStrategyNumBuffersTriggerSpilling) {
-            this.fullStrategyNumBuffersTriggerSpilling = fullStrategyNumBuffersTriggerSpilling;
+        public Builder setFullStrategyNumBuffersTriggerSpillingRatio(
+                float fullStrategyNumBuffersTriggerSpillingRatio) {
+            this.fullStrategyNumBuffersTriggerSpillingRatio =
+                    fullStrategyNumBuffersTriggerSpillingRatio;
             return this;
         }
 
@@ -224,6 +239,11 @@ public class HybridShuffleConfiguration {
             return this;
         }
 
+        public Builder setBufferPoolSizeCheckIntervalMs(long bufferPoolSizeCheckIntervalMs) {
+            this.bufferPoolSizeCheckIntervalMs = bufferPoolSizeCheckIntervalMs;
+            return this;
+        }
+
         public HybridShuffleConfiguration build() {
             return new HybridShuffleConfiguration(
                     maxBuffersReadAhead,
@@ -231,10 +251,11 @@ public class HybridShuffleConfiguration {
                     Math.max(2 * numBuffersPerRequest, numSubpartitions),
                     selectiveStrategySpillThreshold,
                     selectiveStrategySpillBufferRatio,
-                    fullStrategyNumBuffersTriggerSpilling,
+                    fullStrategyNumBuffersTriggerSpillingRatio,
                     fullStrategyReleaseThreshold,
                     fullStrategyReleaseBufferRatio,
-                    spillingStrategyType);
+                    spillingStrategyType,
+                    bufferPoolSizeCheckIntervalMs);
         }
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java
index 44701cbab76..3602b15b2d1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java
@@ -38,7 +38,7 @@ import static org.assertj.core.api.Assertions.entry;
 class HsFullSpillingStrategyTest {
     public static final int NUM_SUBPARTITIONS = 2;
 
-    public static final int NUM_BUFFERS_TRIGGER_SPILLING = 2;
+    public static final float NUM_BUFFERS_TRIGGER_SPILLING_RATIO = 0.2f;
 
     public static final float FULL_SPILL_RELEASE_THRESHOLD = 0.8f;
 
@@ -47,24 +47,31 @@ class HsFullSpillingStrategyTest {
     private final HsSpillingStrategy spillStrategy =
             new HsFullSpillingStrategy(
                     HybridShuffleConfiguration.builder(NUM_SUBPARTITIONS, 1)
-                            .setFullStrategyNumBuffersTriggerSpilling(NUM_BUFFERS_TRIGGER_SPILLING)
+                            .setFullStrategyNumBuffersTriggerSpillingRatio(
+                                    NUM_BUFFERS_TRIGGER_SPILLING_RATIO)
                             .setFullStrategyReleaseThreshold(FULL_SPILL_RELEASE_THRESHOLD)
                             .setFullStrategyReleaseBufferRatio(FULL_SPILL_RELEASE_RATIO)
                             .build());
 
     @Test
     void testOnBufferFinishedUnSpillBufferBelowThreshold() {
+        final int poolSize = 10;
         Optional<Decision> finishedDecision =
-                spillStrategy.onBufferFinished(NUM_BUFFERS_TRIGGER_SPILLING - 1);
+                spillStrategy.onBufferFinished(
+                        (int) (poolSize * NUM_BUFFERS_TRIGGER_SPILLING_RATIO) - 1, poolSize);
         assertThat(finishedDecision).hasValue(Decision.NO_ACTION);
     }
 
     @Test
     void testOnBufferFinishedUnSpillBufferEqualToOrGreatThenThreshold() {
+        final int poolSize = 10;
         Optional<Decision> finishedDecision =
-                spillStrategy.onBufferFinished(NUM_BUFFERS_TRIGGER_SPILLING);
+                spillStrategy.onBufferFinished(
+                        (int) (poolSize * NUM_BUFFERS_TRIGGER_SPILLING_RATIO), poolSize);
         assertThat(finishedDecision).isNotPresent();
-        finishedDecision = spillStrategy.onBufferFinished(NUM_BUFFERS_TRIGGER_SPILLING + 1);
+        finishedDecision =
+                spillStrategy.onBufferFinished(
+                        (int) (poolSize * NUM_BUFFERS_TRIGGER_SPILLING_RATIO) + 1, poolSize);
         assertThat(finishedDecision).isNotPresent();
     }
 
@@ -125,7 +132,8 @@ class HsFullSpillingStrategyTest {
                         .addConsumedBuffers(subpartition1, Arrays.asList(0, 1))
                         .addSpillBuffers(subpartition2, Arrays.asList(1, 2, 3))
                         .addConsumedBuffers(subpartition2, Arrays.asList(0, 1))
-                        .setGetNumTotalUnSpillBuffersSupplier(() -> NUM_BUFFERS_TRIGGER_SPILLING)
+                        .setGetNumTotalUnSpillBuffersSupplier(
+                                () -> (int) (10 * NUM_BUFFERS_TRIGGER_SPILLING_RATIO))
                         .setGetNumTotalRequestedBuffersSupplier(() -> 10)
                         .setGetPoolSizeSupplier(() -> 10)
                         .setGetNextBufferIndexToConsumeSupplier(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerTest.java
index 4cf1bc88c94..773b847ecc1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerTest.java
@@ -65,7 +65,7 @@ class HsMemoryDataManagerTest {
         HsSpillingStrategy spillingStrategy =
                 TestingSpillingStrategy.builder()
                         .setOnBufferFinishedFunction(
-                                (numTotalUnSpillBuffers) -> {
+                                (numTotalUnSpillBuffers, currentPoolSize) -> {
                                     finishedBuffers.incrementAndGet();
                                     return Optional.of(Decision.NO_ACTION);
                                 })
@@ -120,7 +120,7 @@ class HsMemoryDataManagerTest {
         HsSpillingStrategy spillingStrategy =
                 TestingSpillingStrategy.builder()
                         .setOnBufferFinishedFunction(
-                                (numFinishedBuffers) -> {
+                                (numFinishedBuffers, poolSize) -> {
                                     if (numFinishedBuffers < numFinishedBufferToTriggerDecision) {
                                         return Optional.of(Decision.NO_ACTION);
                                     }
@@ -160,7 +160,7 @@ class HsMemoryDataManagerTest {
         HsSpillingStrategy spillingStrategy =
                 TestingSpillingStrategy.builder()
                         .setOnBufferFinishedFunction(
-                                (finishedBuffer) -> {
+                                (finishedBuffer, poolSize) -> {
                                     // return empty optional to trigger global decision.
                                     return Optional.empty();
                                 })
@@ -192,6 +192,34 @@ class HsMemoryDataManagerTest {
         assertThat(resultPartitionReleaseFuture).isCompleted();
     }
 
+    @Test
+    void testPoolSizeCheck() throws Exception {
+        final int requiredBuffers = 10;
+        final int maxBuffers = 100;
+        CompletableFuture<Void> triggerGlobalDecision = new CompletableFuture<>();
+
+        NetworkBufferPool networkBufferPool = new NetworkBufferPool(maxBuffers, bufferSize);
+        BufferPool bufferPool = networkBufferPool.createBufferPool(requiredBuffers, maxBuffers);
+        assertThat(bufferPool.getNumBuffers()).isEqualTo(maxBuffers);
+
+        HsSpillingStrategy spillingStrategy =
+                TestingSpillingStrategy.builder()
+                        .setDecideActionWithGlobalInfoFunction(
+                                (spillingInfoProvider) -> {
+                                    assertThat(spillingInfoProvider.getPoolSize())
+                                            .isEqualTo(requiredBuffers);
+                                    triggerGlobalDecision.complete(null);
+                                    return Decision.NO_ACTION;
+                                })
+                        .build();
+
+        createMemoryDataManager(spillingStrategy, bufferPool);
+        networkBufferPool.createBufferPool(maxBuffers - requiredBuffers, maxBuffers);
+        assertThat(bufferPool.getNumBuffers()).isEqualTo(requiredBuffers);
+
+        assertThat(triggerGlobalDecision).succeedsWithin(10, TimeUnit.SECONDS);
+    }
+
     private HsMemoryDataManager createMemoryDataManager(HsSpillingStrategy spillStrategy)
             throws Exception {
         return createMemoryDataManager(spillStrategy, new HsFileDataIndexImpl(NUM_SUBPARTITIONS));
@@ -201,6 +229,18 @@ class HsMemoryDataManagerTest {
             HsSpillingStrategy spillStrategy, HsFileDataIndex fileDataIndex) throws Exception {
         NetworkBufferPool networkBufferPool = new NetworkBufferPool(NUM_BUFFERS, bufferSize);
         BufferPool bufferPool = networkBufferPool.createBufferPool(poolSize, poolSize);
+        return createMemoryDataManager(bufferPool, spillStrategy, fileDataIndex);
+    }
+
+    private HsMemoryDataManager createMemoryDataManager(
+            HsSpillingStrategy spillingStrategy, BufferPool bufferPool) throws Exception {
+        return createMemoryDataManager(
+                bufferPool, spillingStrategy, new HsFileDataIndexImpl(NUM_SUBPARTITIONS));
+    }
+
+    private HsMemoryDataManager createMemoryDataManager(
+            BufferPool bufferPool, HsSpillingStrategy spillStrategy, HsFileDataIndex fileDataIndex)
+            throws Exception {
         HsMemoryDataManager memoryDataManager =
                 new HsMemoryDataManager(
                         NUM_SUBPARTITIONS,
@@ -209,7 +249,8 @@ class HsMemoryDataManagerTest {
                         spillStrategy,
                         fileDataIndex,
                         dataFilePath,
-                        null);
+                        null,
+                        1000);
         memoryDataManager.setOutputMetrics(createTestingOutputMetrics());
         return memoryDataManager;
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java
index c1086b95f7d..826a7b243f2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java
@@ -41,7 +41,6 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
-import org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleConfiguration.SpillingStrategyType;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.util.IOUtils;
@@ -380,33 +379,6 @@ class HsResultPartitionTest {
         return ByteBuffer.wrap(dataWritten);
     }
 
-    private HsResultPartition createHsResultPartition(
-            int numSubpartitions, BufferPool bufferPool, int numBuffersTriggerSpilling)
-            throws IOException {
-        HsResultPartition hsResultPartition =
-                new HsResultPartition(
-                        "HsResultPartitionTest",
-                        0,
-                        new ResultPartitionID(),
-                        ResultPartitionType.HYBRID_FULL,
-                        numSubpartitions,
-                        numSubpartitions,
-                        readBufferPool,
-                        readIOExecutor,
-                        new ResultPartitionManager(),
-                        fileChannelManager.createChannel().getPath(),
-                        bufferSize,
-                        HybridShuffleConfiguration.builder(
-                                        numSubpartitions, readBufferPool.getNumBuffersPerRequest())
-                                .setSpillingStrategyType(SpillingStrategyType.FULL)
-                                .setFullStrategyNumBuffersTriggerSpilling(numBuffersTriggerSpilling)
-                                .build(),
-                        null,
-                        () -> bufferPool);
-        hsResultPartition.setup();
-        return hsResultPartition;
-    }
-
     private HsResultPartition createHsResultPartition(int numSubpartitions, BufferPool bufferPool)
             throws IOException {
         HsResultPartition hsResultPartition =
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategyTest.java
index dfeb7a0f425..04328fbc32b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategyTest.java
@@ -49,7 +49,7 @@ class HsSelectiveSpillingStrategyTest {
 
     @Test
     void testOnBufferFinished() {
-        Optional<Decision> finishedDecision = spillStrategy.onBufferFinished(5);
+        Optional<Decision> finishedDecision = spillStrategy.onBufferFinished(5, 10);
         assertThat(finishedDecision).hasValue(Decision.NO_ACTION);
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java
index aedb0602c83..451ab8980da 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java
@@ -114,7 +114,8 @@ class HsSubpartitionViewTest {
                         spillingStrategy,
                         new HsFileDataIndexImpl(1),
                         dataFilePath.resolve(".data"),
-                        null);
+                        null,
+                        0);
         memoryDataManager.setOutputMetrics(createTestingOutputMetrics());
         HsDataView hsDataView = memoryDataManager.registerSubpartitionView(0, subpartitionView);
         subpartitionView.setMemoryDataView(hsDataView);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingStrategy.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingStrategy.java
index 61a730fc5c8..48fff99db7d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingStrategy.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingStrategy.java
@@ -26,7 +26,7 @@ import java.util.function.Function;
 public class TestingSpillingStrategy implements HsSpillingStrategy {
     private final BiFunction<Integer, Integer, Optional<Decision>> onMemoryUsageChangedFunction;
 
-    private final Function<Integer, Optional<Decision>> onBufferFinishedFunction;
+    private final BiFunction<Integer, Integer, Optional<Decision>> onBufferFinishedFunction;
 
     private final Function<BufferIndexAndChannel, Optional<Decision>> onBufferConsumedFunction;
 
@@ -36,7 +36,7 @@ public class TestingSpillingStrategy implements HsSpillingStrategy {
 
     private TestingSpillingStrategy(
             BiFunction<Integer, Integer, Optional<Decision>> onMemoryUsageChangedFunction,
-            Function<Integer, Optional<Decision>> onBufferFinishedFunction,
+            BiFunction<Integer, Integer, Optional<Decision>> onBufferFinishedFunction,
             Function<BufferIndexAndChannel, Optional<Decision>> onBufferConsumedFunction,
             Function<HsSpillingInfoProvider, Decision> decideActionWithGlobalInfoFunction,
             Function<HsSpillingInfoProvider, Decision> onResultPartitionClosedFunction) {
@@ -54,8 +54,8 @@ public class TestingSpillingStrategy implements HsSpillingStrategy {
     }
 
     @Override
-    public Optional<Decision> onBufferFinished(int numTotalUnSpillBuffers) {
-        return onBufferFinishedFunction.apply(numTotalUnSpillBuffers);
+    public Optional<Decision> onBufferFinished(int numTotalUnSpillBuffers, int currentPoolSize) {
+        return onBufferFinishedFunction.apply(numTotalUnSpillBuffers, currentPoolSize);
     }
 
     @Override
@@ -82,8 +82,8 @@ public class TestingSpillingStrategy implements HsSpillingStrategy {
         private BiFunction<Integer, Integer, Optional<Decision>> onMemoryUsageChangedFunction =
                 (ignore1, ignore2) -> Optional.of(Decision.NO_ACTION);
 
-        private Function<Integer, Optional<Decision>> onBufferFinishedFunction =
-                (ignore) -> Optional.of(Decision.NO_ACTION);
+        private BiFunction<Integer, Integer, Optional<Decision>> onBufferFinishedFunction =
+                (ignore1, ignore2) -> Optional.of(Decision.NO_ACTION);
 
         private Function<BufferIndexAndChannel, Optional<Decision>> onBufferConsumedFunction =
                 (ignore) -> Optional.of(Decision.NO_ACTION);
@@ -103,7 +103,7 @@ public class TestingSpillingStrategy implements HsSpillingStrategy {
         }
 
         public Builder setOnBufferFinishedFunction(
-                Function<Integer, Optional<Decision>> onBufferFinishedFunction) {
+                BiFunction<Integer, Integer, Optional<Decision>> onBufferFinishedFunction) {
             this.onBufferFinishedFunction = onBufferFinishedFunction;
             return this;
         }