You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gu...@apache.org on 2022/02/07 09:15:08 UTC

[flink] branch master updated: [FLINK-25741][runtime] Skip buffer pools which have no floating buffer in buffer redistributing.

This is an automated email from the ASF dual-hosted git repository.

guoyangze pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 855348a  [FLINK-25741][runtime] Skip buffer pools which have no floating buffer in buffer redistributing.
855348a is described below

commit 855348a9ac482f141131c302b21d4a67e4c3e0c7
Author: Yangze Guo <ka...@gmail.com>
AuthorDate: Thu Dec 30 17:27:26 2021 +0800

    [FLINK-25741][runtime] Skip buffer pools which have no floating buffer in buffer redistributing.
    
    This closes #18433.
---
 .../io/network/buffer/NetworkBufferPool.java        | 21 +++++++++++++++++----
 1 file changed, 17 insertions(+), 4 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index 19e4cb8..ffdb9c7 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -83,6 +83,8 @@ public class NetworkBufferPool
 
     private final Set<LocalBufferPool> allBufferPools = new HashSet<>();
 
+    private final Set<LocalBufferPool> resizableBufferPools = new HashSet<>();
+
     private int numTotalRequiredBuffers;
 
     private final Duration requestSegmentsTimeout;
@@ -500,6 +502,10 @@ public class NetworkBufferPool
 
             allBufferPools.add(localBufferPool);
 
+            if (numRequiredBuffers < maxUsedBuffers) {
+                resizableBufferPools.add(localBufferPool);
+            }
+
             redistributeBuffers();
 
             return localBufferPool;
@@ -515,6 +521,7 @@ public class NetworkBufferPool
         synchronized (factoryLock) {
             if (allBufferPools.remove(bufferPool)) {
                 numTotalRequiredBuffers -= bufferPool.getNumberOfRequiredMemorySegments();
+                resizableBufferPools.remove(bufferPool);
 
                 redistributeBuffers();
             }
@@ -536,7 +543,9 @@ public class NetworkBufferPool
             }
 
             // some sanity checks
-            if (allBufferPools.size() > 0 || numTotalRequiredBuffers > 0) {
+            if (allBufferPools.size() > 0
+                    || numTotalRequiredBuffers > 0
+                    || resizableBufferPools.size() > 0) {
                 throw new IllegalStateException(
                         "NetworkBufferPool is not empty after destroying all LocalBufferPools");
             }
@@ -573,12 +582,16 @@ public class NetworkBufferPool
     private void redistributeBuffers() {
         assert Thread.holdsLock(factoryLock);
 
+        if (resizableBufferPools.isEmpty()) {
+            return;
+        }
+
         // All buffers, which are not among the required ones
         final int numAvailableMemorySegment = totalNumberOfMemorySegments - numTotalRequiredBuffers;
 
         if (numAvailableMemorySegment == 0) {
             // in this case, we need to redistribute buffers so that every pool gets its minimum
-            for (LocalBufferPool bufferPool : allBufferPools) {
+            for (LocalBufferPool bufferPool : resizableBufferPools) {
                 bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments());
             }
             return;
@@ -594,7 +607,7 @@ public class NetworkBufferPool
 
         long totalCapacity = 0; // long to avoid int overflow
 
-        for (LocalBufferPool bufferPool : allBufferPools) {
+        for (LocalBufferPool bufferPool : resizableBufferPools) {
             int excessMax =
                     bufferPool.getMaxNumberOfMemorySegments()
                             - bufferPool.getNumberOfRequiredMemorySegments();
@@ -614,7 +627,7 @@ public class NetworkBufferPool
 
         long totalPartsUsed = 0; // of totalCapacity
         int numDistributedMemorySegment = 0;
-        for (LocalBufferPool bufferPool : allBufferPools) {
+        for (LocalBufferPool bufferPool : resizableBufferPools) {
             int excessMax =
                     bufferPool.getMaxNumberOfMemorySegments()
                             - bufferPool.getNumberOfRequiredMemorySegments();