You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2022/06/21 07:56:01 UTC

[flink] branch master updated: [FLINK-26762][network] Add the overdraft buffer in ResultPartition

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b60ee8e295 [FLINK-26762][network] Add the overdraft buffer in ResultPartition
0b60ee8e295 is described below

commit 0b60ee8e295baaade575d051e3fe9cc6a540cc5d
Author: fanrui <19...@gmail.com>
AuthorDate: Tue May 31 16:03:02 2022 +0800

    [FLINK-26762][network] Add the overdraft buffer in ResultPartition
---
 docs/content.zh/docs/ops/metrics.md                |   4 +-
 docs/content/docs/ops/metrics.md                   |   4 +-
 .../generated/all_taskmanager_network_section.html |   6 ++
 .../netty_shuffle_environment_configuration.html   |   6 ++
 .../NettyShuffleEnvironmentOptions.java            |  18 ++++
 .../io/network/NettyShuffleServiceFactory.java     |   3 +-
 .../io/network/buffer/BufferPoolFactory.java       |   4 +-
 .../runtime/io/network/buffer/LocalBufferPool.java |  93 +++++++++++++-----
 .../io/network/buffer/NetworkBufferPool.java       |  18 +++-
 .../network/partition/ResultPartitionFactory.java  |  12 ++-
 .../NettyShuffleEnvironmentConfiguration.java      |  23 ++++-
 .../io/network/NettyShuffleEnvironmentBuilder.java |  11 ++-
 .../api/writer/RecordWriterDelegateTest.java       |   2 +-
 .../io/network/api/writer/RecordWriterTest.java    |   2 +-
 .../io/network/buffer/LocalBufferPoolTest.java     | 105 ++++++++++++++++++++-
 .../network/partition/ResultPartitionBuilder.java  |  10 +-
 .../partition/ResultPartitionFactoryTest.java      |   3 +-
 .../io/network/partition/ResultPartitionTest.java  |   2 +-
 .../partition/consumer/LocalInputChannelTest.java  |   3 +-
 .../runtime/state/ChannelPersistenceITCase.java    |   3 +-
 20 files changed, 286 insertions(+), 46 deletions(-)

diff --git a/docs/content.zh/docs/ops/metrics.md b/docs/content.zh/docs/ops/metrics.md
index 7f48408c22e..610df4d79b7 100644
--- a/docs/content.zh/docs/ops/metrics.md
+++ b/docs/content.zh/docs/ops/metrics.md
@@ -840,7 +840,7 @@ Deprecated: use [Default shuffle service metrics](#default-shuffle-service)
     </tr>
     <tr>
       <td>outPoolUsage</td>
-      <td>An estimate of the output buffers usage.</td>
+      <td>An estimate of the output buffers usage. The pool usage can be > 100% if overdraft buffers are being used.</td>
       <td>Gauge</td>
     </tr>
     <tr>
@@ -960,7 +960,7 @@ Metrics related to data exchange between task executors using netty network comm
     </tr>
     <tr>
       <td>outPoolUsage</td>
-      <td>An estimate of the output buffers usage.</td>
+      <td>An estimate of the output buffers usage. The pool usage can be > 100% if overdraft buffers are being used.</td>
       <td>Gauge</td>
     </tr>
     <tr>
diff --git a/docs/content/docs/ops/metrics.md b/docs/content/docs/ops/metrics.md
index 0659ba2f4f9..c46a6550fdf 100644
--- a/docs/content/docs/ops/metrics.md
+++ b/docs/content/docs/ops/metrics.md
@@ -840,7 +840,7 @@ Deprecated: use [Default shuffle service metrics](#default-shuffle-service)
     </tr>
     <tr>
       <td>outPoolUsage</td>
-      <td>An estimate of the output buffers usage.</td>
+      <td>An estimate of the output buffers usage. The pool usage can be > 100% if overdraft buffers are being used.</td>
       <td>Gauge</td>
     </tr>
     <tr>
@@ -960,7 +960,7 @@ Metrics related to data exchange between task executors using netty network comm
     </tr>
     <tr>
       <td>outPoolUsage</td>
-      <td>An estimate of the output buffers usage.</td>
+      <td>An estimate of the output buffers usage. The pool usage can be > 100% if overdraft buffers are being used.</td>
       <td>Gauge</td>
     </tr>
     <tr>
diff --git a/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html b/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html
index dc8c9881446..a6705820858 100644
--- a/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html
+++ b/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html
@@ -80,6 +80,12 @@
             <td>Integer</td>
             <td>Number of max buffers that can be used for each channel. If a channel exceeds the number of max buffers, it will make the task become unavailable, cause the back pressure and block the data processing. This might speed up checkpoint alignment by preventing excessive growth of the buffered in-flight data in case of data skew and high number of configured floating buffers. This limit is not strictly guaranteed, and can be ignored by things like flatMap operators, records sp [...]
         </tr>
+        <tr>
+            <td><h5>taskmanager.network.memory.max-overdraft-buffers-per-gate</h5></td>
+            <td style="word-wrap: break-word;">5</td>
+            <td>Integer</td>
+            <td>Number of max overdraft network buffers to use for each ResultPartition. The overdraft buffers will be used when the subtask cannot apply to the normal buffers  due to back pressure, while subtask is performing an action that can not be interrupted in the middle,  like serializing a large record, flatMap operator producing multiple records for one single input record or processing time timer producing large output. In situations like that system will allow subtask to requ [...]
+        </tr>
         <tr>
             <td><h5>taskmanager.network.netty.client.connectTimeoutSec</h5></td>
             <td style="word-wrap: break-word;">120</td>
diff --git a/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html b/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html
index 0f4a0ebb069..c9f19ff76ba 100644
--- a/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html
+++ b/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html
@@ -68,6 +68,12 @@
             <td>Integer</td>
             <td>Number of max buffers that can be used for each channel. If a channel exceeds the number of max buffers, it will make the task become unavailable, cause the back pressure and block the data processing. This might speed up checkpoint alignment by preventing excessive growth of the buffered in-flight data in case of data skew and high number of configured floating buffers. This limit is not strictly guaranteed, and can be ignored by things like flatMap operators, records sp [...]
         </tr>
+        <tr>
+            <td><h5>taskmanager.network.memory.max-overdraft-buffers-per-gate</h5></td>
+            <td style="word-wrap: break-word;">5</td>
+            <td>Integer</td>
+            <td>Number of max overdraft network buffers to use for each ResultPartition. The overdraft buffers will be used when the subtask cannot apply to the normal buffers  due to back pressure, while subtask is performing an action that can not be interrupted in the middle,  like serializing a large record, flatMap operator producing multiple records for one single input record or processing time timer producing large output. In situations like that system will allow subtask to requ [...]
+        </tr>
         <tr>
             <td><h5>taskmanager.network.netty.client.connectTimeoutSec</h5></td>
             <td style="word-wrap: break-word;">120</td>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
index 97820469660..0286bc27254 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
@@ -274,6 +274,24 @@ public class NettyShuffleEnvironmentOptions {
                                     + " and can be ignored by things like flatMap operators, records spanning multiple buffers or single timer"
                                     + " producing large amount of data.");
 
+    /** Number of max overdraft network buffers to use for each ResultPartition. */
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+    public static final ConfigOption<Integer> NETWORK_MAX_OVERDRAFT_BUFFERS_PER_GATE =
+            key("taskmanager.network.memory.max-overdraft-buffers-per-gate")
+                    .intType()
+                    .defaultValue(5)
+                    .withDescription(
+                            "Number of max overdraft network buffers to use for each ResultPartition. The overdraft buffers"
+                                    + " will be used when the subtask cannot apply to the normal buffers  due to back pressure,"
+                                    + " while subtask is performing an action that can not be interrupted in the middle,  like"
+                                    + " serializing a large record, flatMap operator producing multiple records for one single"
+                                    + " input record or processing time timer producing large output. In situations like that"
+                                    + " system will allow subtask to request overdraft buffers, so that the subtask can finish"
+                                    + " such uninterruptible action, without blocking unaligned checkpoints for long period of"
+                                    + " time. Overdraft buffers are provided on best effort basis only if the system has some"
+                                    + " unused buffers available. Subtask that has used overdraft buffers won't be allowed to"
+                                    + " process any more records until the overdraft buffers are returned to the pool.");
+
     /** The timeout for requesting exclusive buffers for each channel. */
     @Documentation.ExcludeFromDocumentation(
             "This option is purely implementation related, and may be removed as the implementation changes.")
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
index 05e39322629..e3789924fa8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
@@ -198,7 +198,8 @@ public class NettyShuffleServiceFactory
                         config.getMaxBuffersPerChannel(),
                         config.sortShuffleMinBuffers(),
                         config.sortShuffleMinParallelism(),
-                        config.isSSLEnabled());
+                        config.isSSLEnabled(),
+                        config.getMaxOverdraftBuffersPerGate());
 
         SingleInputGateFactory singleInputGateFactory =
                 new SingleInputGateFactory(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java
index 3947e331835..04a0cf2d60a 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java
@@ -44,12 +44,14 @@ public interface BufferPoolFactory {
      * @param maxUsedBuffers maximum number of network buffers this pool offers
      * @param numSubpartitions number of subpartitions in this pool
      * @param maxBuffersPerChannel maximum number of buffers to use for each channel
+     * @param maxOverdraftBuffersPerGate maximum number of overdraft buffers to use for each gate
      */
     BufferPool createBufferPool(
             int numRequiredBuffers,
             int maxUsedBuffers,
             int numSubpartitions,
-            int maxBuffersPerChannel)
+            int maxBuffersPerChannel,
+            int maxOverdraftBuffersPerGate)
             throws IOException;
 
     /** Destroy callback for updating factory book keeping. */
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index 0d5068c02f3..a5c54d3ea7a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.buffer;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.util.ExceptionUtils;
@@ -118,6 +119,11 @@ class LocalBufferPool implements BufferPool {
     @GuardedBy("availableMemorySegments")
     private int unavailableSubpartitionsCount = 0;
 
+    private final int maxOverdraftBuffersPerGate;
+
+    @GuardedBy("availableMemorySegments")
+    private int numberOfRequestedOverdraftMemorySegments;
+
     @GuardedBy("availableMemorySegments")
     private boolean isDestroyed;
 
@@ -141,7 +147,8 @@ class LocalBufferPool implements BufferPool {
                 numberOfRequiredMemorySegments,
                 Integer.MAX_VALUE,
                 0,
-                Integer.MAX_VALUE);
+                Integer.MAX_VALUE,
+                0);
     }
 
     /**
@@ -162,7 +169,8 @@ class LocalBufferPool implements BufferPool {
                 numberOfRequiredMemorySegments,
                 maxNumberOfMemorySegments,
                 0,
-                Integer.MAX_VALUE);
+                Integer.MAX_VALUE,
+                0);
     }
 
     /**
@@ -174,13 +182,15 @@ class LocalBufferPool implements BufferPool {
      * @param maxNumberOfMemorySegments maximum number of network buffers to allocate
      * @param numberOfSubpartitions number of subpartitions
      * @param maxBuffersPerChannel maximum number of buffers to use for each channel
+     * @param maxOverdraftBuffersPerGate maximum number of overdraft buffers to use for each gate
      */
     LocalBufferPool(
             NetworkBufferPool networkBufferPool,
             int numberOfRequiredMemorySegments,
             int maxNumberOfMemorySegments,
             int numberOfSubpartitions,
-            int maxBuffersPerChannel)
+            int maxBuffersPerChannel,
+            int maxOverdraftBuffersPerGate)
             throws IOException {
         checkArgument(
                 numberOfRequiredMemorySegments > 0,
@@ -208,6 +218,10 @@ class LocalBufferPool implements BufferPool {
                     maxBuffersPerChannel > 0,
                     "Maximum number of buffers for each channel (%s) should be larger than 0.",
                     maxBuffersPerChannel);
+            checkArgument(
+                    maxOverdraftBuffersPerGate >= 0,
+                    "Maximum number of overdraft buffers for each gate (%s) should not be less than 0.",
+                    maxOverdraftBuffersPerGate);
         }
 
         this.subpartitionBuffersCount = new int[numberOfSubpartitions];
@@ -216,6 +230,7 @@ class LocalBufferPool implements BufferPool {
             subpartitionBufferRecyclers[i] = new SubpartitionBufferRecycler(i, this);
         }
         this.maxBuffersPerChannel = maxBuffersPerChannel;
+        this.maxOverdraftBuffersPerGate = maxOverdraftBuffersPerGate;
 
         // Lock is only taken, because #checkAvailability asserts it. It's a small penalty for
         // thread safety.
@@ -282,6 +297,11 @@ class LocalBufferPool implements BufferPool {
         }
     }
 
+    @VisibleForTesting
+    public int getNumberOfRequestedOverdraftMemorySegments() {
+        return numberOfRequestedOverdraftMemorySegments;
+    }
+
     @Override
     public int getNumberOfAvailableMemorySegments() {
         synchronized (availableMemorySegments) {
@@ -298,7 +318,11 @@ class LocalBufferPool implements BufferPool {
 
     @Override
     public int bestEffortGetNumOfUsedBuffers() {
-        return Math.max(0, numberOfRequestedMemorySegments - availableMemorySegments.size());
+        return Math.max(
+                0,
+                numberOfRequestedMemorySegments
+                        + numberOfRequestedOverdraftMemorySegments
+                        - availableMemorySegments.size());
     }
 
     @Override
@@ -372,7 +396,11 @@ class LocalBufferPool implements BufferPool {
         synchronized (availableMemorySegments) {
             checkDestroyed();
 
-            segment = availableMemorySegments.poll();
+            if (availableMemorySegments.isEmpty()) {
+                segment = requestOverdraftMemorySegmentFromGlobal();
+            } else {
+                segment = availableMemorySegments.poll();
+            }
 
             if (segment == null) {
                 return null;
@@ -424,6 +452,24 @@ class LocalBufferPool implements BufferPool {
         return false;
     }
 
+    private MemorySegment requestOverdraftMemorySegmentFromGlobal() {
+        assert Thread.holdsLock(availableMemorySegments);
+
+        if (numberOfRequestedOverdraftMemorySegments >= maxOverdraftBuffersPerGate) {
+            return null;
+        }
+
+        checkState(
+                !isDestroyed,
+                "Destroyed buffer pools should never acquire segments - this will lead to buffer leaks.");
+
+        MemorySegment segment = networkBufferPool.requestPooledMemorySegment();
+        if (segment != null) {
+            numberOfRequestedOverdraftMemorySegments++;
+        }
+        return segment;
+    }
+
     /**
      * Tries to obtain a buffer from global pool as soon as one pool is available. Note that
      * multiple {@link LocalBufferPool}s might wait on the future of the global pool, hence this
@@ -466,24 +512,26 @@ class LocalBufferPool implements BufferPool {
     private boolean shouldBeAvailable() {
         assert Thread.holdsLock(availableMemorySegments);
 
-        return !availableMemorySegments.isEmpty() && unavailableSubpartitionsCount == 0;
+        return !availableMemorySegments.isEmpty()
+                && unavailableSubpartitionsCount == 0
+                && numberOfRequestedOverdraftMemorySegments == 0;
     }
 
     private boolean checkAvailability() {
         assert Thread.holdsLock(availableMemorySegments);
 
         if (!availableMemorySegments.isEmpty()) {
-            return unavailableSubpartitionsCount == 0;
+            return shouldBeAvailable();
         }
-        if (!isRequestedSizeReached()) {
-            if (requestMemorySegmentFromGlobal()) {
-                return unavailableSubpartitionsCount == 0;
-            } else {
-                requestMemorySegmentFromGlobalWhenAvailable();
-                return shouldBeAvailable();
-            }
+        if (isRequestedSizeReached()) {
+            return false;
         }
-        return false;
+
+        // There aren't availableMemorySegments and we continue to request new memory segment.
+        if (!requestMemorySegmentFromGlobal()) {
+            requestMemorySegmentFromGlobalWhenAvailable();
+        }
+        return shouldBeAvailable();
     }
 
     private void checkConsistentAvailability() {
@@ -518,10 +566,7 @@ class LocalBufferPool implements BufferPool {
                     listener = registeredListeners.poll();
                     if (listener == null) {
                         availableMemorySegments.add(segment);
-                        // only need to check unavailableSubpartitionsCount here because
-                        // availableMemorySegments is not empty
-                        if (!availabilityHelper.isApproximatelyAvailable()
-                                && unavailableSubpartitionsCount == 0) {
+                        if (!availabilityHelper.isApproximatelyAvailable() && shouldBeAvailable()) {
                             toNotify = availabilityHelper.getUnavailableToResetAvailable();
                         }
                         break;
@@ -658,7 +703,12 @@ class LocalBufferPool implements BufferPool {
     private void returnMemorySegment(MemorySegment segment) {
         assert Thread.holdsLock(availableMemorySegments);
 
-        numberOfRequestedMemorySegments--;
+        // When using the overdraft buffer, return the overdraft buffer first.
+        if (numberOfRequestedOverdraftMemorySegments > 0) {
+            numberOfRequestedOverdraftMemorySegments--;
+        } else {
+            numberOfRequestedMemorySegments--;
+        }
         networkBufferPool.recyclePooledMemorySegment(segment);
     }
 
@@ -676,7 +726,8 @@ class LocalBufferPool implements BufferPool {
     }
 
     private boolean hasExcessBuffers() {
-        return numberOfRequestedMemorySegments > currentPoolSize;
+        return numberOfRequestedOverdraftMemorySegments > 0
+                || numberOfRequestedMemorySegments > currentPoolSize;
     }
 
     private boolean isRequestedSizeReached() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index 23bfb723b74..331d589574b 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -448,7 +448,8 @@ public class NetworkBufferPool
     @Override
     public BufferPool createBufferPool(int numRequiredBuffers, int maxUsedBuffers)
             throws IOException {
-        return internalCreateBufferPool(numRequiredBuffers, maxUsedBuffers, 0, Integer.MAX_VALUE);
+        return internalCreateBufferPool(
+                numRequiredBuffers, maxUsedBuffers, 0, Integer.MAX_VALUE, 0);
     }
 
     @Override
@@ -456,17 +457,23 @@ public class NetworkBufferPool
             int numRequiredBuffers,
             int maxUsedBuffers,
             int numSubpartitions,
-            int maxBuffersPerChannel)
+            int maxBuffersPerChannel,
+            int maxOverdraftBuffersPerGate)
             throws IOException {
         return internalCreateBufferPool(
-                numRequiredBuffers, maxUsedBuffers, numSubpartitions, maxBuffersPerChannel);
+                numRequiredBuffers,
+                maxUsedBuffers,
+                numSubpartitions,
+                maxBuffersPerChannel,
+                maxOverdraftBuffersPerGate);
     }
 
     private BufferPool internalCreateBufferPool(
             int numRequiredBuffers,
             int maxUsedBuffers,
             int numSubpartitions,
-            int maxBuffersPerChannel)
+            int maxBuffersPerChannel,
+            int maxOverdraftBuffersPerGate)
             throws IOException {
 
         // It is necessary to use a separate lock from the one used for buffer
@@ -498,7 +505,8 @@ public class NetworkBufferPool
                             numRequiredBuffers,
                             maxUsedBuffers,
                             numSubpartitions,
-                            maxBuffersPerChannel);
+                            maxBuffersPerChannel,
+                            maxOverdraftBuffersPerGate);
 
             allBufferPools.add(localBufferPool);
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
index f8abda3d864..e0c6bba9a05 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
@@ -75,6 +75,8 @@ public class ResultPartitionFactory {
 
     private final boolean sslEnabled;
 
+    private final int maxOverdraftBuffersPerGate;
+
     public ResultPartitionFactory(
             ResultPartitionManager partitionManager,
             FileChannelManager channelManager,
@@ -90,7 +92,8 @@ public class ResultPartitionFactory {
             int maxBuffersPerChannel,
             int sortShuffleMinBuffers,
             int sortShuffleMinParallelism,
-            boolean sslEnabled) {
+            boolean sslEnabled,
+            int maxOverdraftBuffersPerGate) {
 
         this.partitionManager = partitionManager;
         this.channelManager = channelManager;
@@ -107,6 +110,7 @@ public class ResultPartitionFactory {
         this.sortShuffleMinBuffers = sortShuffleMinBuffers;
         this.sortShuffleMinParallelism = sortShuffleMinParallelism;
         this.sslEnabled = sslEnabled;
+        this.maxOverdraftBuffersPerGate = maxOverdraftBuffersPerGate;
     }
 
     public ResultPartition create(
@@ -278,7 +282,11 @@ public class ResultPartitionFactory {
                             type);
 
             return bufferPoolFactory.createBufferPool(
-                    pair.getLeft(), pair.getRight(), numberOfSubpartitions, maxBuffersPerChannel);
+                    pair.getLeft(),
+                    pair.getRight(),
+                    numberOfSubpartitions,
+                    maxBuffersPerChannel,
+                    maxOverdraftBuffersPerGate);
         };
     }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
index ac371a35362..869f655a801 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
@@ -97,6 +97,8 @@ public class NettyShuffleEnvironmentConfiguration {
 
     private final boolean connectionReuseEnabled;
 
+    private final int maxOverdraftBuffersPerGate;
+
     public NettyShuffleEnvironmentConfiguration(
             int numNetworkBuffers,
             int networkBufferSize,
@@ -117,7 +119,8 @@ public class NettyShuffleEnvironmentConfiguration {
             int sortShuffleMinParallelism,
             BufferDebloatConfiguration debloatConfiguration,
             int maxNumberOfConnections,
-            boolean connectionReuseEnabled) {
+            boolean connectionReuseEnabled,
+            int maxOverdraftBuffersPerGate) {
 
         this.numNetworkBuffers = numNetworkBuffers;
         this.networkBufferSize = networkBufferSize;
@@ -139,6 +142,7 @@ public class NettyShuffleEnvironmentConfiguration {
         this.debloatConfiguration = debloatConfiguration;
         this.maxNumberOfConnections = maxNumberOfConnections;
         this.connectionReuseEnabled = connectionReuseEnabled;
+        this.maxOverdraftBuffersPerGate = maxOverdraftBuffersPerGate;
     }
 
     // ------------------------------------------------------------------------
@@ -227,6 +231,10 @@ public class NettyShuffleEnvironmentConfiguration {
         return maxNumberOfConnections;
     }
 
+    public int getMaxOverdraftBuffersPerGate() {
+        return maxOverdraftBuffersPerGate;
+    }
+
     // ------------------------------------------------------------------------
 
     /**
@@ -278,6 +286,10 @@ public class NettyShuffleEnvironmentConfiguration {
                 configuration.getInteger(
                         NettyShuffleEnvironmentOptions.NETWORK_MAX_BUFFERS_PER_CHANNEL);
 
+        int maxOverdraftBuffersPerGate =
+                configuration.getInteger(
+                        NettyShuffleEnvironmentOptions.NETWORK_MAX_OVERDRAFT_BUFFERS_PER_GATE);
+
         long batchShuffleReadMemoryBytes =
                 configuration.get(TaskManagerOptions.NETWORK_BATCH_SHUFFLE_READ_MEMORY).getBytes();
 
@@ -342,7 +354,8 @@ public class NettyShuffleEnvironmentConfiguration {
                 sortShuffleMinParallelism,
                 BufferDebloatConfiguration.fromConfiguration(configuration),
                 maxNumConnections,
-                connectionReuseEnabled);
+                connectionReuseEnabled,
+                maxOverdraftBuffersPerGate);
     }
 
     /**
@@ -481,6 +494,7 @@ public class NettyShuffleEnvironmentConfiguration {
         result = 31 * result + sortShuffleMinParallelism;
         result = 31 * result + maxNumberOfConnections;
         result = 31 * result + (connectionReuseEnabled ? 1 : 0);
+        result = 31 * result + maxOverdraftBuffersPerGate;
         return result;
     }
 
@@ -513,7 +527,8 @@ public class NettyShuffleEnvironmentConfiguration {
                     && this.maxBuffersPerChannel == that.maxBuffersPerChannel
                     && Objects.equals(this.compressionCodec, that.compressionCodec)
                     && this.maxNumberOfConnections == that.maxNumberOfConnections
-                    && this.connectionReuseEnabled == that.connectionReuseEnabled;
+                    && this.connectionReuseEnabled == that.connectionReuseEnabled
+                    && this.maxOverdraftBuffersPerGate == that.maxOverdraftBuffersPerGate;
         }
     }
 
@@ -554,6 +569,8 @@ public class NettyShuffleEnvironmentConfiguration {
                 + maxNumberOfConnections
                 + ", connectionReuseEnabled="
                 + connectionReuseEnabled
+                + ", maxOverdraftBuffersPerGate="
+                + maxOverdraftBuffersPerGate
                 + '}';
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
index ed1e80b4473..5b6bd50ee21 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
@@ -68,6 +68,8 @@ public class NettyShuffleEnvironmentBuilder {
 
     private boolean connectionReuseEnabled = true;
 
+    private int maxOverdraftBuffersPerGate = 0;
+
     private String compressionCodec = "LZ4";
 
     private ResourceID taskManagerLocation = ResourceID.generate();
@@ -158,6 +160,12 @@ public class NettyShuffleEnvironmentBuilder {
         return this;
     }
 
+    public NettyShuffleEnvironmentBuilder setMaxOverdraftBuffersPerGate(
+            int maxOverdraftBuffersPerGate) {
+        this.maxOverdraftBuffersPerGate = maxOverdraftBuffersPerGate;
+        return this;
+    }
+
     public NettyShuffleEnvironmentBuilder setCompressionCodec(String compressionCodec) {
         this.compressionCodec = compressionCodec;
         return this;
@@ -217,7 +225,8 @@ public class NettyShuffleEnvironmentBuilder {
                         sortShuffleMinParallelism,
                         debloatConfiguration,
                         maxNumberOfConnections,
-                        connectionReuseEnabled),
+                        connectionReuseEnabled,
+                        maxOverdraftBuffersPerGate),
                 taskManagerLocation,
                 new TaskEventDispatcher(),
                 resultPartitionManager,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java
index 2be1d6a42a4..4d6d1260d1d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java
@@ -127,7 +127,7 @@ public class RecordWriterDelegateTest extends TestLogger {
     }
 
     private RecordWriter createRecordWriter(NetworkBufferPool globalPool) throws Exception {
-        final BufferPool localPool = globalPool.createBufferPool(1, 1, 1, Integer.MAX_VALUE);
+        final BufferPool localPool = globalPool.createBufferPool(1, 1, 1, Integer.MAX_VALUE, 0);
         final ResultPartitionWriter partition =
                 new ResultPartitionBuilder().setBufferPoolFactory(() -> localPool).build();
         partition.setup();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index ea511479569..a12cef50714 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -300,7 +300,7 @@ public class RecordWriterTest {
     public void testIsAvailableOrNot() throws Exception {
         // setup
         final NetworkBufferPool globalPool = new NetworkBufferPool(10, 128);
-        final BufferPool localPool = globalPool.createBufferPool(1, 1, 1, Integer.MAX_VALUE);
+        final BufferPool localPool = globalPool.createBufferPool(1, 1, 1, Integer.MAX_VALUE, 0);
         final ResultPartitionWriter resultPartition =
                 new ResultPartitionBuilder().setBufferPoolFactory(() -> localPool).build();
         resultPartition.setup();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
index 7401b5c6e9e..269734f5930 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.buffer;
 
+import org.apache.flink.core.fs.AutoCloseableRegistry;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.testutils.executor.TestExecutorResource;
@@ -36,7 +37,9 @@ import java.time.Duration;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Deque;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -45,6 +48,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -439,7 +443,7 @@ public class LocalBufferPoolTest extends TestLogger {
     @Test
     public void testMaxBuffersPerChannelAndAvailability() throws Exception {
         localBufferPool.lazyDestroy();
-        localBufferPool = new LocalBufferPool(networkBufferPool, 1, Integer.MAX_VALUE, 3, 2);
+        localBufferPool = new LocalBufferPool(networkBufferPool, 1, Integer.MAX_VALUE, 3, 2, 0);
         localBufferPool.setNumBuffers(10);
 
         assertTrue(localBufferPool.getAvailableFuture().isDone());
@@ -549,6 +553,105 @@ public class LocalBufferPoolTest extends TestLogger {
         }
     }
 
+    @Test
+    public void testOverdraftBufferAndAvailability() throws Exception {
+        for (int maxOverdraftBuffers = 0; maxOverdraftBuffers < 3; maxOverdraftBuffers++) {
+            useAllOverdraftBuffersAndCheckIsLegal(4, 3, maxOverdraftBuffers, 2, 1);
+            useAllOverdraftBuffersAndCheckIsLegal(4, 3, maxOverdraftBuffers, 2, 2);
+            useAllOverdraftBuffersAndCheckIsLegal(4, 3, maxOverdraftBuffers, 3, 2);
+
+            useAllOverdraftBuffersAndCheckIsLegal(8, 5, maxOverdraftBuffers, 2, 1);
+            useAllOverdraftBuffersAndCheckIsLegal(8, 5, maxOverdraftBuffers, 2, 2);
+            useAllOverdraftBuffersAndCheckIsLegal(8, 5, maxOverdraftBuffers, 3, 2);
+
+            useAllOverdraftBuffersAndCheckIsLegal(12, 10, maxOverdraftBuffers, 2, 1);
+            useAllOverdraftBuffersAndCheckIsLegal(12, 10, maxOverdraftBuffers, 2, 2);
+            useAllOverdraftBuffersAndCheckIsLegal(12, 10, maxOverdraftBuffers, 3, 2);
+        }
+    }
+
+    private void useAllOverdraftBuffersAndCheckIsLegal(
+            int poolSize,
+            int maxBuffersPerChannel,
+            int maxOverdraftBuffers,
+            int numberOfChannels,
+            int availableChannels)
+            throws Exception {
+        checkArgument(maxBuffersPerChannel > poolSize / numberOfChannels);
+        checkArgument(numberOfChannels >= availableChannels);
+        LocalBufferPool bufferPool =
+                new LocalBufferPool(
+                        networkBufferPool,
+                        1,
+                        Integer.MAX_VALUE,
+                        numberOfChannels,
+                        maxBuffersPerChannel,
+                        maxOverdraftBuffers);
+        bufferPool.setNumBuffers(poolSize);
+
+        // Request all buffers inside the buffer pool
+        Map<Integer, AutoCloseableRegistry> closeableRegistryMap = new HashMap<>();
+        for (int i = 0; i < poolSize; i++) {
+            int targetChannel = i % availableChannels;
+            BufferBuilder bufferBuilder = bufferPool.requestBufferBuilder(targetChannel);
+            assertNotNull(bufferBuilder);
+            closeableRegistryMap
+                    .computeIfAbsent(targetChannel, channel -> new AutoCloseableRegistry())
+                    .registerCloseable(bufferBuilder);
+            boolean isAvailable =
+                    (i + 1 < poolSize) && i < availableChannels * (maxBuffersPerChannel - 1);
+            assertRequestedBufferAndIsAvailable(bufferPool, 0, i + 1, isAvailable);
+        }
+
+        // request overdraft buffer
+        AutoCloseableRegistry overdraftCloseableRegistry = new AutoCloseableRegistry();
+        for (int i = 0; i < maxOverdraftBuffers; i++) {
+            int targetChannel = i % availableChannels;
+            BufferBuilder bufferBuilder = bufferPool.requestBufferBuilder(targetChannel);
+            assertNotNull(bufferBuilder);
+            overdraftCloseableRegistry.registerCloseable(bufferBuilder);
+            int numberOfRequestedOverdraftBuffer = i + 1;
+            assertRequestedBufferAndIsAvailable(
+                    bufferPool,
+                    numberOfRequestedOverdraftBuffer,
+                    poolSize + numberOfRequestedOverdraftBuffer,
+                    false);
+        }
+
+        for (int i = 0; i < numberOfChannels; i++) {
+            assertNull(bufferPool.requestBufferBuilder(i));
+            assertRequestedBufferAndIsAvailable(
+                    bufferPool, maxOverdraftBuffers, poolSize + maxOverdraftBuffers, false);
+        }
+
+        // release all bufferBuilder
+        overdraftCloseableRegistry.close();
+        assertRequestedBufferAndIsAvailable(bufferPool, 0, poolSize, false);
+        int numberOfRequestedBuffer = poolSize;
+        for (AutoCloseableRegistry closeableRegistry : closeableRegistryMap.values()) {
+            numberOfRequestedBuffer =
+                    numberOfRequestedBuffer - closeableRegistry.getNumberOfRegisteredCloseables();
+            closeableRegistry.close();
+            assertRequestedBufferAndIsAvailable(bufferPool, 0, numberOfRequestedBuffer, true);
+        }
+        bufferPool.lazyDestroy();
+    }
+
+    private void assertRequestedBufferAndIsAvailable(
+            LocalBufferPool bufferPool,
+            int numberOfRequestedOverdraftBuffer,
+            int numberOfRequestedBuffer,
+            boolean isAvailable) {
+        if (numberOfRequestedOverdraftBuffer > 0) {
+            checkArgument(!isAvailable);
+        }
+        assertEquals(
+                numberOfRequestedOverdraftBuffer,
+                bufferPool.getNumberOfRequestedOverdraftMemorySegments());
+        assertEquals(numberOfRequestedBuffer, bufferPool.bestEffortGetNumOfUsedBuffers());
+        assertEquals(isAvailable, bufferPool.getAvailableFuture().isDone());
+    }
+
     // ------------------------------------------------------------------------
     // Helpers
     // ------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
index 7c9d81ebaab..2d2edbe36e9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
@@ -80,6 +80,8 @@ public class ResultPartitionBuilder {
 
     private String compressionCodec = "LZ4";
 
+    private int maxOverdraftBuffersPerGate = 5;
+
     public ResultPartitionBuilder setResultPartitionIndex(int partitionIndex) {
         this.partitionIndex = partitionIndex;
         return this;
@@ -203,6 +205,11 @@ public class ResultPartitionBuilder {
         return this;
     }
 
+    public ResultPartitionBuilder setMaxOverdraftBuffersPerGate(int maxOverdraftBuffersPerGate) {
+        this.maxOverdraftBuffersPerGate = maxOverdraftBuffersPerGate;
+        return this;
+    }
+
     public ResultPartition build() {
         ResultPartitionFactory resultPartitionFactory =
                 new ResultPartitionFactory(
@@ -220,7 +227,8 @@ public class ResultPartitionBuilder {
                         maxBuffersPerChannel,
                         sortShuffleMinBuffers,
                         sortShuffleMinParallelism,
-                        sslEnabled);
+                        sslEnabled,
+                        maxOverdraftBuffersPerGate);
 
         SupplierWithException<BufferPool, IOException> factory =
                 bufferPoolFactory.orElseGet(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
index 0c54e3878a0..f0b31679f37 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
@@ -125,7 +125,8 @@ public class ResultPartitionFactoryTest extends TestLogger {
                         Integer.MAX_VALUE,
                         10,
                         sortShuffleMinParallelism,
-                        false);
+                        false,
+                        0);
 
         final ResultPartitionDeploymentDescriptor descriptor =
                 new ResultPartitionDeploymentDescriptor(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index 4fbaf198b52..cf1e4ec9e3b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -373,7 +373,7 @@ public class ResultPartitionTest {
         // setup
         int bufferSize = 1024;
         NetworkBufferPool globalPool = new NetworkBufferPool(10, bufferSize);
-        BufferPool localPool = globalPool.createBufferPool(1, 1, 1, Integer.MAX_VALUE);
+        BufferPool localPool = globalPool.createBufferPool(1, 1, 1, Integer.MAX_VALUE, 0);
         BufferWritingResultPartition resultPartition =
                 (BufferWritingResultPartition)
                         new ResultPartitionBuilder().setBufferPoolFactory(() -> localPool).build();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 1df9fc556a2..363dc1403c2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -173,7 +173,8 @@ public class LocalInputChannelTest {
                                                     producerBufferPoolSize,
                                                     producerBufferPoolSize,
                                                     parallelism,
-                                                    Integer.MAX_VALUE))
+                                                    Integer.MAX_VALUE,
+                                                    0))
                             .build();
 
             // Create a buffer pool for this partition
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java
index 7306ad3a749..8f7eec638b4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java
@@ -180,7 +180,8 @@ public class ChannelPersistenceITCase {
                                                 numberOfSubpartitions,
                                                 Integer.MAX_VALUE,
                                                 numberOfSubpartitions,
-                                                Integer.MAX_VALUE))
+                                                Integer.MAX_VALUE,
+                                                0))
                         .build();
         resultPartition.setup();
         return (BufferWritingResultPartition) resultPartition;