You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/12/13 04:28:46 UTC

[flink] branch release-1.16 updated: [FLINK-29298] LocalBufferPool request buffer from NetworkBufferPool hanging.

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.16 by this push:
     new 40fbea33c35 [FLINK-29298] LocalBufferPool request buffer from NetworkBufferPool hanging.
40fbea33c35 is described below

commit 40fbea33c35bf9d04bff35f01554bb91c874b975
Author: Weijie Guo <re...@163.com>
AuthorDate: Wed Sep 14 12:02:12 2022 +0800

    [FLINK-29298] LocalBufferPool request buffer from NetworkBufferPool hanging.
    
    when the task thread polled out the last buffer in LocalBufferPool and triggered the onGlobalPoolAvailable callback itself, it will skip this notification  (as currently the LocalBufferPool is available), which will cause the BufferPool to eventually become unavailable and will never register a callback to the NetworkBufferPool.
    
    This closes #20924
---
 .../runtime/io/network/buffer/LocalBufferPool.java | 142 +++++++++++++++------
 .../io/network/buffer/LocalBufferPoolTest.java     |  33 +++++
 2 files changed, 135 insertions(+), 40 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index e5bfc087982..214f5c949d9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -130,8 +130,12 @@ class LocalBufferPool implements BufferPool {
     @GuardedBy("availableMemorySegments")
     private final AvailabilityHelper availabilityHelper = new AvailabilityHelper();
 
+    /**
+     * Indicates whether this {@link LocalBufferPool} has requested to be notified on the next time
+     * that global pool becoming available, so it can then request buffer from the global pool.
+     */
     @GuardedBy("availableMemorySegments")
-    private boolean requestingWhenAvailable;
+    private boolean requestingNotificationOfGlobalPoolAvailable;
 
     /**
      * Local buffer pool based on the given <tt>networkBufferPool</tt> with a minimal number of
@@ -232,14 +236,10 @@ class LocalBufferPool implements BufferPool {
         this.maxBuffersPerChannel = maxBuffersPerChannel;
         this.maxOverdraftBuffersPerGate = maxOverdraftBuffersPerGate;
 
-        // Lock is only taken, because #checkAvailability asserts it. It's a small penalty for
-        // thread safety.
+        // Lock is only taken, because #checkAndUpdateAvailability asserts it. It's a small penalty
+        // for thread safety.
         synchronized (this.availableMemorySegments) {
-            if (checkAvailability()) {
-                availabilityHelper.resetAvailable();
-            }
-
-            checkConsistentAvailability();
+            checkAndUpdateAvailability();
         }
     }
 
@@ -412,11 +412,7 @@ class LocalBufferPool implements BufferPool {
                 }
             }
 
-            if (!checkAvailability()) {
-                availabilityHelper.resetUnavailable();
-            }
-
-            checkConsistentAvailability();
+            checkAndUpdateAvailability();
         }
         return segment;
     }
@@ -475,22 +471,22 @@ class LocalBufferPool implements BufferPool {
      * multiple {@link LocalBufferPool}s might wait on the future of the global pool, hence this
      * method double-check if a new buffer is really needed at the time it becomes available.
      */
+    @GuardedBy("availableMemorySegments")
     private void requestMemorySegmentFromGlobalWhenAvailable() {
         assert Thread.holdsLock(availableMemorySegments);
 
-        if (requestingWhenAvailable) {
-            return;
-        }
-        requestingWhenAvailable = true;
-
+        checkState(
+                !requestingNotificationOfGlobalPoolAvailable,
+                "local buffer pool is already in the state of requesting memory segment from global when it is available.");
+        requestingNotificationOfGlobalPoolAvailable = true;
         assertNoException(
                 networkBufferPool.getAvailableFuture().thenRun(this::onGlobalPoolAvailable));
     }
 
     private void onGlobalPoolAvailable() {
-        CompletableFuture<?> toNotify = null;
+        CompletableFuture<?> toNotify;
         synchronized (availableMemorySegments) {
-            requestingWhenAvailable = false;
+            requestingNotificationOfGlobalPoolAvailable = false;
             if (isDestroyed || availabilityHelper.isApproximatelyAvailable()) {
                 // there is currently no benefit to obtain buffer from global; give other pools
                 // precedent
@@ -502,9 +498,7 @@ class LocalBufferPool implements BufferPool {
             // #requestMemorySegmentFromGlobalWhenAvailable again if no segment could be fetched
             // because of
             // concurrent requests from different LocalBufferPools.
-            if (checkAvailability()) {
-                toNotify = availabilityHelper.getUnavailableToResetAvailable();
-            }
+            toNotify = checkAndUpdateAvailability();
         }
         mayNotifyAvailable(toNotify);
     }
@@ -517,21 +511,48 @@ class LocalBufferPool implements BufferPool {
                 && numberOfRequestedOverdraftMemorySegments == 0;
     }
 
-    private boolean checkAvailability() {
+    @GuardedBy("availableMemorySegments")
+    private CompletableFuture<?> checkAndUpdateAvailability() {
+        assert Thread.holdsLock(availableMemorySegments);
+
+        CompletableFuture<?> toNotify = null;
+
+        AvailabilityStatus availabilityStatus = checkAvailability();
+        if (availabilityStatus.isAvailable()) {
+            toNotify = availabilityHelper.getUnavailableToResetAvailable();
+        } else {
+            availabilityHelper.resetUnavailable();
+        }
+        if (availabilityStatus.isNeedRequestingNotificationOfGlobalPoolAvailable()) {
+            requestMemorySegmentFromGlobalWhenAvailable();
+        }
+
+        checkConsistentAvailability();
+        return toNotify;
+    }
+
+    @GuardedBy("availableMemorySegments")
+    private AvailabilityStatus checkAvailability() {
         assert Thread.holdsLock(availableMemorySegments);
 
         if (!availableMemorySegments.isEmpty()) {
-            return shouldBeAvailable();
+            return AvailabilityStatus.from(shouldBeAvailable(), false);
         }
         if (isRequestedSizeReached()) {
-            return false;
+            return AvailabilityStatus.UNAVAILABLE_NEED_NOT_REQUESTING_NOTIFICATION;
         }
-
-        // There aren't availableMemorySegments and we continue to request new memory segment.
+        boolean needRequestingNotificationOfGlobalPoolAvailable = false;
+        // There aren't availableMemorySegments and we continue to request new memory segment from
+        // global pool.
         if (!requestMemorySegmentFromGlobal()) {
-            requestMemorySegmentFromGlobalWhenAvailable();
+            // If we can not get a buffer from global pool, we should request from it when it
+            // becomes available. It should be noted that if we are already in this status, do not
+            // need to repeat the request.
+            needRequestingNotificationOfGlobalPoolAvailable =
+                    !requestingNotificationOfGlobalPoolAvailable;
         }
-        return shouldBeAvailable();
+        return AvailabilityStatus.from(
+                shouldBeAvailable(), needRequestingNotificationOfGlobalPoolAvailable);
     }
 
     private void checkConsistentAvailability() {
@@ -633,7 +654,7 @@ class LocalBufferPool implements BufferPool {
 
     @Override
     public void setNumBuffers(int numBuffers) {
-        CompletableFuture<?> toNotify = null;
+        CompletableFuture<?> toNotify;
         synchronized (availableMemorySegments) {
             checkArgument(
                     numBuffers >= numberOfRequiredMemorySegments,
@@ -648,18 +669,12 @@ class LocalBufferPool implements BufferPool {
             if (isDestroyed) {
                 // FLINK-19964: when two local buffer pools are released concurrently, one of them
                 // gets buffers assigned
-                // make sure that checkAvailability is not called as it would pro-actively acquire
-                // one buffer from NetworkBufferPool
+                // make sure that checkAndUpdateAvailability is not called as it would pro-actively
+                // acquire one buffer from NetworkBufferPool.
                 return;
             }
 
-            if (checkAvailability()) {
-                toNotify = availabilityHelper.getUnavailableToResetAvailable();
-            } else {
-                availabilityHelper.resetUnavailable();
-            }
-
-            checkConsistentAvailability();
+            toNotify = checkAndUpdateAvailability();
         }
 
         mayNotifyAvailable(toNotify);
@@ -757,4 +772,51 @@ class LocalBufferPool implements BufferPool {
             bufferPool.recycle(memorySegment, channel);
         }
     }
+
+    /**
+     * This class represents the buffer pool's current ground-truth availability and whether to
+     * request buffer from global pool when it is available.
+     */
+    private enum AvailabilityStatus {
+        AVAILABLE(true, false),
+        UNAVAILABLE_NEED_REQUESTING_NOTIFICATION(false, true),
+        UNAVAILABLE_NEED_NOT_REQUESTING_NOTIFICATION(false, false);
+
+        /** Indicates whether the {@link LocalBufferPool} is currently available. */
+        private final boolean available;
+
+        /**
+         * Indicates whether to requesting notification of global pool when it becomes available.
+         */
+        private final boolean needRequestingNotificationOfGlobalPoolAvailable;
+
+        AvailabilityStatus(
+                boolean available, boolean needRequestingNotificationOfGlobalPoolAvailable) {
+            this.available = available;
+            this.needRequestingNotificationOfGlobalPoolAvailable =
+                    needRequestingNotificationOfGlobalPoolAvailable;
+        }
+
+        public boolean isAvailable() {
+            return available;
+        }
+
+        public boolean isNeedRequestingNotificationOfGlobalPoolAvailable() {
+            return needRequestingNotificationOfGlobalPoolAvailable;
+        }
+
+        public static AvailabilityStatus from(
+                boolean available, boolean needRequestingNotificationOfGlobalPoolAvailable) {
+            if (available) {
+                checkState(
+                        !needRequestingNotificationOfGlobalPoolAvailable,
+                        "available local buffer pool should not request from global.");
+                return AVAILABLE;
+            } else if (needRequestingNotificationOfGlobalPoolAvailable) {
+                return UNAVAILABLE_NEED_REQUESTING_NOTIFICATION;
+            } else {
+                return UNAVAILABLE_NEED_NOT_REQUESTING_NOTIFICATION;
+            }
+        }
+    }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
index 269734f5930..3149697471d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.buffer;
 
 import org.apache.flink.core.fs.AutoCloseableRegistry;
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.testutils.executor.TestExecutorResource;
 import org.apache.flink.util.TestLogger;
@@ -28,6 +29,7 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.jupiter.api.Timeout;
 import org.mockito.Mockito;
 
 import javax.annotation.Nullable;
@@ -248,6 +250,37 @@ public class LocalBufferPoolTest extends TestLogger {
         }
     }
 
+    @Test
+    @Timeout(30)
+    public void testRequestBuffersOnRecycle() throws Exception {
+        BufferPool bufferPool1 = networkBufferPool.createBufferPool(512, 2048);
+        List<MemorySegment> segments = new ArrayList<>();
+        for (int i = 0; i < 1023; i++) {
+            segments.add(bufferPool1.requestMemorySegmentBlocking());
+        }
+        BufferPool bufferPool2 = networkBufferPool.createBufferPool(512, 512);
+        List<MemorySegment> segments2 = new ArrayList<>();
+        CheckedThread checkedThread =
+                new CheckedThread() {
+                    @Override
+                    public void go() throws Exception {
+                        for (int i = 0; i < 512; i++) {
+                            segments2.add(bufferPool2.requestMemorySegmentBlocking());
+                        }
+                    }
+                };
+        checkedThread.start();
+        for (MemorySegment segment : segments) {
+            bufferPool1.recycle(segment);
+        }
+        bufferPool1.lazyDestroy();
+        checkedThread.sync();
+        for (MemorySegment segment : segments2) {
+            bufferPool2.recycle(segment);
+        }
+        bufferPool2.lazyDestroy();
+    }
+
     @Test
     public void testRecycleExcessBuffersAfterRecycling() {
         localBufferPool.setNumBuffers(numBuffers);