You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/12/08 11:42:03 UTC

[GitHub] [flink] xintongsong commented on a diff in pull request #20924: [FLINK-29298] LocalBufferPool request buffer from NetworkBufferPool hanging.

xintongsong commented on code in PR #20924:
URL: https://github.com/apache/flink/pull/20924#discussion_r1043241608


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -130,6 +130,10 @@ class LocalBufferPool implements BufferPool {
     @GuardedBy("availableMemorySegments")
     private final AvailabilityHelper availabilityHelper = new AvailabilityHelper();
 
+    /**
+     * Indicates this {@link LocalBufferPool} will request buffer from global pool when it becomes
+     * available.
+     */

Review Comment:
   ```suggestion
       /**
        * Indicates whether this {@link LocalBufferPool} has requested to be notified on the next time that global pool becoming available, so it can then request buffer from the global pool.
        */
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -757,4 +781,47 @@ public void recycle(MemorySegment memorySegment) {
             bufferPool.recycle(memorySegment, channel);
         }
     }
+
+    /**
+     * This class represents the buffer pool's current ground-truth availability and whether to
+     * request buffer from global pool when it is available.
+     */
+    private enum AvailabilityStatus {
+        AVAILABLE(true, false),
+        UNAVAILABLE_NEED_REQUEST_FROM_GLOBAL(false, true),
+        UNAVAILABLE_NEED_NOT_REQUEST_FROM_GLOBAL(false, false);
+
+        /** Indicates whether the {@link LocalBufferPool} is currently available. */
+        private final boolean available;
+
+        /** Indicates whether to request buffer from globalPool when it is available. */
+        private final boolean needRequestFromGlobalWhenAvailable;

Review Comment:
   See comment for `requestingWhenAvailable`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -652,13 +674,15 @@ public void setNumBuffers(int numBuffers) {
                 // one buffer from NetworkBufferPool
                 return;
             }
-
-            if (checkAvailability()) {
+            AvailabilityStatus availabilityAndRequestFromGlobalPool = checkAvailability();
+            if (availabilityAndRequestFromGlobalPool.isAvailable()) {
                 toNotify = availabilityHelper.getUnavailableToResetAvailable();
             } else {
                 availabilityHelper.resetUnavailable();
             }
-
+            if (availabilityAndRequestFromGlobalPool.isNeedRequestFromGlobalWhenAvailable()) {
+                requestMemorySegmentFromGlobalWhenAvailable();
+            }
             checkConsistentAvailability();

Review Comment:
   This common pattern has repeated 4 times. It would be nice to deduplicate it.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -757,4 +781,47 @@ public void recycle(MemorySegment memorySegment) {
             bufferPool.recycle(memorySegment, channel);
         }
     }
+
+    /**
+     * This class represents the buffer pool's current ground-truth availability and whether to
+     * request buffer from global pool when it is available.
+     */
+    private enum AvailabilityStatus {
+        AVAILABLE(true, false),
+        UNAVAILABLE_NEED_REQUEST_FROM_GLOBAL(false, true),
+        UNAVAILABLE_NEED_NOT_REQUEST_FROM_GLOBAL(false, false);

Review Comment:
   See comment for `requestingWhenAvailable`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -130,6 +130,10 @@ class LocalBufferPool implements BufferPool {
     @GuardedBy("availableMemorySegments")
     private final AvailabilityHelper availabilityHelper = new AvailabilityHelper();
 
+    /**
+     * Indicates this {@link LocalBufferPool} will request buffer from global pool when it becomes
+     * available.
+     */

Review Comment:
   I'd suggest to also rename `requestingWhenAvailable` to `requestingNotificationOfGlobalPoolAvailable`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org