You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "reswqa (via GitHub)" <gi...@apache.org> on 2023/03/02 17:37:21 UTC

[GitHub] [flink] reswqa opened a new pull request, #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

reswqa opened a new pull request, #22084:
URL: https://github.com/apache/flink/pull/22084

   ## What is the purpose of the change
   
   *In our current implementation, once there is no available buffer, overdraft buffers can. be requested, which does not meet our requirements for the name the `overdraft`. In fact, we expect only request additional buffers when the pool size is reached.*
   
   *Another key point is the definition of LocalBufferPool's `available` status. Actually, before overdraft buffer was introduced, its definition was very clear: 
   There is at least one `availableMemorySegment` and no subpartitions has reached `maxBuffersPerChannel`. 
   IMO, Introducing the overdraft mechanism should not break this protocol. Otherwise, it may cause some very hidden bugs. 
   It should be noted that even if we only allow request overdraft buffers after reaches the `poolSize`, there may still be a situation where both `availableMemorySegment` and overdraft buffer are all not zero, and this state should obviously be defined as available.*
   
   
   ## Brief change log
   
     - *Add missing @GuardedBy annotation for LocalBufferPool.*
     - *LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached.*
   
   
   ## Verifying this change
   
   Manually test this change by running TPC-DS and added unit tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): yes
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1146535330


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -503,12 +508,11 @@ private void onGlobalPoolAvailable() {
         mayNotifyAvailable(toNotify);
     }
 
+    @GuardedBy("availableMemorySegments")
     private boolean shouldBeAvailable() {
         assert Thread.holdsLock(availableMemorySegments);
 
-        return !availableMemorySegments.isEmpty()
-                && unavailableSubpartitionsCount == 0
-                && numberOfRequestedOverdraftMemorySegments == 0;
+        return !availableMemorySegments.isEmpty() && unavailableSubpartitionsCount == 0;

Review Comment:
   FYI, I agree that remove this condition from `shouldBeAvailable` does not affect the fix of this bug. In order not to block this ticket, I added this condition back. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1146504412


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -503,12 +508,11 @@ private void onGlobalPoolAvailable() {
         mayNotifyAvailable(toNotify);
     }
 
+    @GuardedBy("availableMemorySegments")
     private boolean shouldBeAvailable() {
         assert Thread.holdsLock(availableMemorySegments);
 
-        return !availableMemorySegments.isEmpty()
-                && unavailableSubpartitionsCount == 0
-                && numberOfRequestedOverdraftMemorySegments == 0;
+        return !availableMemorySegments.isEmpty() && unavailableSubpartitionsCount == 0;

Review Comment:
   Thanks @akalash for the timely feedback.
   
   It makes sense to remove `numberOfRequestedOverdraftMemorySegments == 0` in another PR. Have to say, I like your suggestion about refactoring this class as the current logic looks bloated and a little confusing. Once we can sort out the relationship between these concepts, it can also help us understand whether we need to keep `numberOfRequestedOverdraftMemorySegments == 0`. 
   
   I will keep this condition in this PR to fix this bug first and take a closer look at whether it's necessary to remove it when the refactoring is complete.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1129022590


##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java:
##########
@@ -242,23 +243,206 @@ public void testRecycleAfterDestroy() {
         localBufferPool.lazyDestroy();
 
         // All buffers have been requested, but can not be returned yet.
-        assertEquals(numBuffers, getNumRequestedFromMemorySegmentPool());
+        assertThat(getNumRequestedFromMemorySegmentPool()).isEqualTo(numBuffers);
 
         // Recycle should return buffers to memory segment pool
         for (Buffer buffer : requests) {
             buffer.recycleBuffer();
         }
     }
 
+    @Test
+    void testDecreasePoolSize() throws Exception {
+        final int maxMemorySegments = 10;
+        final int requiredMemorySegments = 4;
+        final int maxOverdraftBuffers = 2;
+        final int largePoolSize = 5;
+        final int smallPoolSize = 4;
+        LocalBufferPool bufferPool =
+                new LocalBufferPool(
+                        networkBufferPool,
+                        requiredMemorySegments,
+                        maxMemorySegments,
+                        0,
+                        Integer.MAX_VALUE,
+                        maxOverdraftBuffers);
+        Queue<MemorySegment> buffers = new LinkedList<>();
+
+        // set a larger pool size.
+        bufferPool.setNumBuffers(largePoolSize);
+        assertThat(bufferPool.getNumBuffers()).isEqualTo(largePoolSize);
+
+        // request all buffer.
+        for (int i = 0; i < largePoolSize; i++) {
+            buffers.add(bufferPool.requestMemorySegmentBlocking());
+        }
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // request 1 overdraft buffers.
+        buffers.add(bufferPool.requestMemorySegmentBlocking());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // set a small pool size.
+        bufferPool.setNumBuffers(smallPoolSize);
+        assertThat(bufferPool.getNumBuffers()).isEqualTo(smallPoolSize);
+        assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isZero();
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isFalse();
+        buffers.add(bufferPool.requestMemorySegmentBlocking());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isEqualTo(2);
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // return all overdraft buffers.
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isFalse();
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isZero();
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // return the excess buffer.
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.isAvailable()).isFalse();
+        // return non-excess buffers.
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isTrue();
+
+        while (!buffers.isEmpty()) {
+            bufferPool.recycle(buffers.poll());
+        }
+        bufferPool.lazyDestroy();
+    }
+
+    @Test
+    void testIncreasePoolSize() throws Exception {

Review Comment:
   Sure, I added a test for `largePoolSize = 7`.
   As for `largePoolSize=8`, I think it is no different from the current situation of `largePoolSize = 10`. It also exceeds the total number of buffers but does not reach `maxMemorySegments`. But for clarity, I changed the previous test to `largePoolSize = 8`, and added test for the case of total number of buffers reached `maxMemorySegments`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1128947802


##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java:
##########
@@ -242,23 +243,206 @@ public void testRecycleAfterDestroy() {
         localBufferPool.lazyDestroy();
 
         // All buffers have been requested, but can not be returned yet.
-        assertEquals(numBuffers, getNumRequestedFromMemorySegmentPool());
+        assertThat(getNumRequestedFromMemorySegmentPool()).isEqualTo(numBuffers);
 
         // Recycle should return buffers to memory segment pool
         for (Buffer buffer : requests) {
             buffer.recycleBuffer();
         }
     }
 
+    @Test
+    void testDecreasePoolSize() throws Exception {
+        final int maxMemorySegments = 10;
+        final int requiredMemorySegments = 4;
+        final int maxOverdraftBuffers = 2;
+        final int largePoolSize = 5;
+        final int smallPoolSize = 4;
+        LocalBufferPool bufferPool =
+                new LocalBufferPool(
+                        networkBufferPool,
+                        requiredMemorySegments,
+                        maxMemorySegments,
+                        0,
+                        Integer.MAX_VALUE,
+                        maxOverdraftBuffers);
+        Queue<MemorySegment> buffers = new LinkedList<>();
+
+        // set a larger pool size.
+        bufferPool.setNumBuffers(largePoolSize);
+        assertThat(bufferPool.getNumBuffers()).isEqualTo(largePoolSize);
+
+        // request all buffer.
+        for (int i = 0; i < largePoolSize; i++) {
+            buffers.add(bufferPool.requestMemorySegmentBlocking());
+        }
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // request 1 overdraft buffers.
+        buffers.add(bufferPool.requestMemorySegmentBlocking());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // set a small pool size.
+        bufferPool.setNumBuffers(smallPoolSize);
+        assertThat(bufferPool.getNumBuffers()).isEqualTo(smallPoolSize);
+        assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isZero();
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isFalse();
+        buffers.add(bufferPool.requestMemorySegmentBlocking());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isEqualTo(2);
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // return all overdraft buffers.
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isFalse();
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isZero();
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // return the excess buffer.
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.isAvailable()).isFalse();
+        // return non-excess buffers.
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isTrue();
+
+        while (!buffers.isEmpty()) {
+            bufferPool.recycle(buffers.poll());
+        }
+        bufferPool.lazyDestroy();
+    }
+
+    @Test
+    void testIncreasePoolSize() throws Exception {

Review Comment:
   Agree with you, fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on PR #22084:
URL: https://github.com/apache/flink/pull/22084#issuecomment-1482982621

   Thanks @akalash and @1996fanrui for the review.
   
   > Is there an easy way to check that we allocate an overdraft buffer only when isRequestedSizeReached? I mean we can create test version of networkBufferPool to check this
   
   The main reasons why I don't think there's an easy way are:
   - `NetworkBufferPool` does not distinguish between `overdraft` and `non-overdraft` buffers request.
   -  `NetworkBufferPool` itself is not an interface, it is not easy to make a very beautiful mock for it (i.e. `TestingNetworkBufferPool`), of course it can also be done through inheritance, but this will make our test still partly dependent on its implementation.
   
   One solution I can think of is: we keep requesting and returning buffers until there are `currentPoolSize` buffers in `availableMemorySegment`. Next, we intercept the `requestPooledMemorySegment` method of the test version 
    of `NetworkBufferPool` and keep requesting buffers. Then we check that this method will be called only `isRequestedSizeReached()` is satisfied. Is this really necessary compared to the complexity? The tests introduced in this PR already cover this part, at least to some extent.
   
   Perhaps it is enough if we can adding more checks for tests like `testDecreasePoolSize` to ensure that the condition must be met before requesting the overdraft buffer. WDYT?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #22084: [WIP][FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

Posted by "1996fanrui (via GitHub)" <gi...@apache.org>.
1996fanrui commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1125605884


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -396,7 +396,7 @@ private MemorySegment requestMemorySegment(int targetChannel) {
         synchronized (availableMemorySegments) {
             checkDestroyed();
 
-            if (availableMemorySegments.isEmpty()) {
+            if (availableMemorySegments.isEmpty() && isRequestedSizeReached()) {

Review Comment:
   I'm not sure whether `availableMemorySegments.poll();` should be executed when `availableMemorySegments.isEmpty()`. 
   
   1. How about call it just when `!availableMemorySegments.isEmpty()`?
   2. We should add some comments for `requestOverdraftMemorySegmentFromGlobal`, explain when to request the overdraft buffer.
   
   ```suggestion
               if (!availableMemorySegments.isEmpty()) {
                   segment = availableMemorySegments.poll();
               } else if (isRequestedSizeReached()) {
                   // Only when the buffer request reaches the upper limit, requests an overdraft buffer
                   segment = requestOverdraftMemorySegmentFromGlobal();
               }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

Posted by "1996fanrui (via GitHub)" <gi...@apache.org>.
1996fanrui commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1128996203


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -503,12 +508,11 @@ private void onGlobalPoolAvailable() {
         mayNotifyAvailable(toNotify);
     }
 
+    @GuardedBy("availableMemorySegments")
     private boolean shouldBeAvailable() {
         assert Thread.holdsLock(availableMemorySegments);
 
-        return !availableMemorySegments.isEmpty()
-                && unavailableSubpartitionsCount == 0
-                && numberOfRequestedOverdraftMemorySegments == 0;
+        return !availableMemorySegments.isEmpty() && unavailableSubpartitionsCount == 0;

Review Comment:
   > Actually, before overdraft buffer was introduced, the definition of available was very clear:
   There is at least one availableMemorySegment and no subpartitions has reached maxBuffersPerChannel.
   IMO, Introducing the overdraft mechanism should not break this protocol, overdraft buffer should not affect the judgment of availability.
   
   Sorry, overdraft buffer should affect the judgment of availability.
   
   As we discussed before: overdraft buffer is just used when requested buffer reached the upper limit(pool size). In the other word: _**overdraft buffer just be used after LocalBufferPool is unavailable.**_ 
   
   And why the name is `overdraft`? It temporarily uses some extra buffers outside the LocalBufferPool. From the semantics of overdraft, if `numberOfRequestedOverdraftMemorySegments > 0`, then LocalBufferPool must be unavailable. That's why I add it here.
   
   Why you want to remove it? I guess it has bug before, that is, the overdraft buffer is used when the `requested buffer` does not reach the upper limit, and you have fixed it in this PR.
   
   ```
               if (!availableMemorySegments.isEmpty()) {
                   segment = availableMemorySegments.poll();
               } else if (isRequestedSizeReached()) {
                   // Only when the buffer request reaches the upper limit(i.e. current pool size),
                   // requests an overdraft buffer.
                   segment = requestOverdraftMemorySegmentFromGlobal();
               }
   ```
   
   > For me, the key point is that if we think "The availableMemorySegments is always empty when numberOfRequestedOverdraftMemorySegments != 0." is tenable.
   
   If it's tenable now and future, we can remove the ` && numberOfRequestedOverdraftMemorySegments == 0` here, if not, it cannot be removed.
   
   Based on your feedback, I prefer keep it, because the root cause is: overdraft is misused in some cases, and you have fixed it.
   
   If there is other bug that misuse overdraft buffer, we should fix the bug to ensure overdraft buffer is used correctly instead of mark LocalBufferPool is available. Marking the LocalBufferPool is available directly may cause other unexpected bugs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] akalash commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

Posted by "akalash (via GitHub)" <gi...@apache.org>.
akalash commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1146481079


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -503,12 +508,11 @@ private void onGlobalPoolAvailable() {
         mayNotifyAvailable(toNotify);
     }
 
+    @GuardedBy("availableMemorySegments")
     private boolean shouldBeAvailable() {
         assert Thread.holdsLock(availableMemorySegments);
 
-        return !availableMemorySegments.isEmpty()
-                && unavailableSubpartitionsCount == 0
-                && numberOfRequestedOverdraftMemorySegments == 0;
+        return !availableMemorySegments.isEmpty() && unavailableSubpartitionsCount == 0;

Review Comment:
   I actually mostly agree that `if (isRequestedSizeReached())` condition and `while` inside `setNumBuffers` fix the bug of this ticket. It is difficult to say something about removing `numberOfRequestedOverdraftMemorySegments == 0`. On one hand, it is better doesn't change anything and improve it in another PR(if we still think it makes sense) since it doesn't relate to this bug, on the other hand, in the worst case(if we still have a bug) removing this condition just allows us to allocate 'ordinary' buffer while we have already overdraft buffers which is not so good but still better than have a deadlock. Let me think about it a little more. But anyway if we will decide to remove it I think it makes sense to do it in a separate commit just to understand where is the fix and where is the prophylactic/improvement
   
   In general, I would say that it seems we need to improve/refactor this class a little(maybe I will create a ticket for that). Since I think we don't have a strong/explicit relationship between  `numberOfRequestedOverdraftMemorySegments`, `numberOfRequestedMemorySegments`, `availableMemorySegment` and `currentPoolSize` for example:
   - `!availableMemorySegment.isEmpty() && numberOfRequestedOverdraftMemorySegments != 0` - never should be true
   - `numberOfRequestedMemorySegments == currentPoolSize && availableMemorySegment.isEmpty()` - always should be true when `numberOfRequestedOverdraftMemorySegments > 0`
   - etc.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1146504412


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -503,12 +508,11 @@ private void onGlobalPoolAvailable() {
         mayNotifyAvailable(toNotify);
     }
 
+    @GuardedBy("availableMemorySegments")
     private boolean shouldBeAvailable() {
         assert Thread.holdsLock(availableMemorySegments);
 
-        return !availableMemorySegments.isEmpty()
-                && unavailableSubpartitionsCount == 0
-                && numberOfRequestedOverdraftMemorySegments == 0;
+        return !availableMemorySegments.isEmpty() && unavailableSubpartitionsCount == 0;

Review Comment:
   Thanks @akalash for the timely feedback.
   
   It makes sense to remove `numberOfRequestedOverdraftMemorySegments == 0` in another PR. Have to say, I like your suggestion about refactoring this class as the current logic looks bloated and a little confusing. Once we can sort out the relationship between these concepts, it can also help us understand whether we need to keep `numberOfRequestedOverdraftMemorySegments == 0`. 
   
   I will keep this condition in this PR first and take a closer look at whether it's necessary to remove it when the refactoring is complete.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1127489392


##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java:
##########
@@ -242,23 +243,129 @@ public void testRecycleAfterDestroy() {
         localBufferPool.lazyDestroy();
 
         // All buffers have been requested, but can not be returned yet.
-        assertEquals(numBuffers, getNumRequestedFromMemorySegmentPool());
+        assertThat(getNumRequestedFromMemorySegmentPool()).isEqualTo(numBuffers);
 
         // Recycle should return buffers to memory segment pool
         for (Buffer buffer : requests) {
             buffer.recycleBuffer();
         }
     }
 
+    @Test
+    void testDecreasePoolSize() throws Exception {
+        LocalBufferPool bufferPool =
+                new LocalBufferPool(networkBufferPool, 4, 10, 0, Integer.MAX_VALUE, 2);
+        Queue<MemorySegment> buffers = new LinkedList<>();
+
+        // set pool size to 5.
+        bufferPool.setNumBuffers(5);
+        assertThat(bufferPool.getNumBuffers()).isEqualTo(5);
+
+        // request all buffer.
+        for (int i = 0; i < 5; i++) {
+            buffers.add(bufferPool.requestMemorySegmentBlocking());
+        }
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // request 1 overdraft buffers.
+        buffers.add(bufferPool.requestMemorySegmentBlocking());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // set pool size to 4.
+        bufferPool.setNumBuffers(4);
+        assertThat(bufferPool.getNumBuffers()).isEqualTo(4);
+        assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isZero();
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isFalse();
+        buffers.add(bufferPool.requestMemorySegmentBlocking());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isEqualTo(2);
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // return all overdraft buffers.
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isFalse();
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isZero();
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // return the excess buffer.
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.isAvailable()).isFalse();
+        // return non-excess buffers.
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isTrue();
+
+        while (!buffers.isEmpty()) {
+            bufferPool.recycle(buffers.poll());
+        }
+        bufferPool.lazyDestroy();
+    }
+
+    @Test
+    void testIncreasePoolSize() throws Exception {
+        LocalBufferPool bufferPool =
+                new LocalBufferPool(networkBufferPool, 5, 100, 0, Integer.MAX_VALUE, 2);
+        List<MemorySegment> buffers = new ArrayList<>();
+
+        // set pool size to 5.
+        bufferPool.setNumBuffers(5);
+        assertThat(bufferPool.getNumBuffers()).isEqualTo(5);
+
+        // request all buffer.
+        for (int i = 0; i < 5; i++) {
+            buffers.add(bufferPool.requestMemorySegmentBlocking());
+        }
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // request 2 overdraft buffers.
+        buffers.add(bufferPool.requestMemorySegmentBlocking());
+        buffers.add(bufferPool.requestMemorySegmentBlocking());
+        assertThat(bufferPool.requestMemorySegment()).isNull();
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isEqualTo(2);
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // set pool size to 10.
+        bufferPool.setNumBuffers(10);
+        assertThat(bufferPool.getNumBuffers()).isEqualTo(10);
+        assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isOne();
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isEqualTo(2);
+        // available status will not be influenced by overdraft.
+        assertThat(bufferPool.isAvailable()).isTrue();
+        buffers.add(bufferPool.requestMemorySegmentBlocking());
+        assertThat(bufferPool.isAvailable()).isTrue();

Review Comment:
   Good catch !!! I think the second one is more better. If pool size becomes larger, some overdraft buffers need convert to requested buffers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

Posted by "flinkbot (via GitHub)" <gi...@apache.org>.
flinkbot commented on PR #22084:
URL: https://github.com/apache/flink/pull/22084#issuecomment-1452266850

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "4b7c484bcd38bc2cfb1a74adc6b546a94fc96028",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "4b7c484bcd38bc2cfb1a74adc6b546a94fc96028",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4b7c484bcd38bc2cfb1a74adc6b546a94fc96028 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on PR #22084:
URL: https://github.com/apache/flink/pull/22084#issuecomment-1452263845

   @1996fanrui and @wsry, would you mind taking a look at this? This pr currently can be reviewed, but not the final status. Because there may be some tests to be added and migrated to junit5.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on PR #22084:
URL: https://github.com/apache/flink/pull/22084#issuecomment-1493721859

   FYI:If there are no other concerns, I will merge this PR tomorrow.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

Posted by "1996fanrui (via GitHub)" <gi...@apache.org>.
1996fanrui commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1128996203


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -503,12 +508,11 @@ private void onGlobalPoolAvailable() {
         mayNotifyAvailable(toNotify);
     }
 
+    @GuardedBy("availableMemorySegments")
     private boolean shouldBeAvailable() {
         assert Thread.holdsLock(availableMemorySegments);
 
-        return !availableMemorySegments.isEmpty()
-                && unavailableSubpartitionsCount == 0
-                && numberOfRequestedOverdraftMemorySegments == 0;
+        return !availableMemorySegments.isEmpty() && unavailableSubpartitionsCount == 0;

Review Comment:
   > Actually, before overdraft buffer was introduced, the definition of available was very clear:
   There is at least one availableMemorySegment and no subpartitions has reached maxBuffersPerChannel.
   IMO, Introducing the overdraft mechanism should not break this protocol, overdraft buffer should not affect the judgment of availability.
   
   Sorry, overdraft buffer should affect the judgment of availability.
   
   As we discussed before: overdraft buffer is just used when requested buffer reached the upper limit(pool size). In the other word: overdraft buffer should be used after LocalBufferPool is unavailable. 
   
   And why the name is `overdraft`? It temporarily uses some extra buffers outside the LocalBufferPool. From the semantics of overdraft, if `numberOfRequestedOverdraftMemorySegments > 0`, then LocalBufferPool must be unavailable. That's why I add it here.
   
   Why you want to remove it? I guess it has bug before, that is, the overdraft buffer is used when the `requested buffer` does not reach the upper limit, and you have fixed it in this PR.
   
   ```
               if (!availableMemorySegments.isEmpty()) {
                   segment = availableMemorySegments.poll();
               } else if (isRequestedSizeReached()) {
                   // Only when the buffer request reaches the upper limit(i.e. current pool size),
                   // requests an overdraft buffer.
                   segment = requestOverdraftMemorySegmentFromGlobal();
               }
   ```
   
   > For me, the key point is that if we think "The availableMemorySegments is always empty when numberOfRequestedOverdraftMemorySegments != 0." is tenable.
   
   If it's tenable now and future, we can remove the ` && numberOfRequestedOverdraftMemorySegments == 0` here, if not, it cannot be removed.
   
   Based on your feedback, I prefer keep it, because the root cause is: overdraft is misused in some cases, and you have fixed it.
   
   If there is other bug that misuse overdraft buffer, we should fix the bug to ensure overdraft buffer is used correctly instead of mark LocalBufferPool is available. Marking the LocalBufferPool is available directly may cause other unexpected bugs.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java:
##########
@@ -242,23 +243,206 @@ public void testRecycleAfterDestroy() {
         localBufferPool.lazyDestroy();
 
         // All buffers have been requested, but can not be returned yet.
-        assertEquals(numBuffers, getNumRequestedFromMemorySegmentPool());
+        assertThat(getNumRequestedFromMemorySegmentPool()).isEqualTo(numBuffers);
 
         // Recycle should return buffers to memory segment pool
         for (Buffer buffer : requests) {
             buffer.recycleBuffer();
         }
     }
 
+    @Test
+    void testDecreasePoolSize() throws Exception {
+        final int maxMemorySegments = 10;
+        final int requiredMemorySegments = 4;
+        final int maxOverdraftBuffers = 2;
+        final int largePoolSize = 5;
+        final int smallPoolSize = 4;
+        LocalBufferPool bufferPool =
+                new LocalBufferPool(
+                        networkBufferPool,
+                        requiredMemorySegments,
+                        maxMemorySegments,
+                        0,
+                        Integer.MAX_VALUE,
+                        maxOverdraftBuffers);
+        Queue<MemorySegment> buffers = new LinkedList<>();
+
+        // set a larger pool size.
+        bufferPool.setNumBuffers(largePoolSize);
+        assertThat(bufferPool.getNumBuffers()).isEqualTo(largePoolSize);
+
+        // request all buffer.
+        for (int i = 0; i < largePoolSize; i++) {
+            buffers.add(bufferPool.requestMemorySegmentBlocking());
+        }
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // request 1 overdraft buffers.
+        buffers.add(bufferPool.requestMemorySegmentBlocking());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // set a small pool size.
+        bufferPool.setNumBuffers(smallPoolSize);
+        assertThat(bufferPool.getNumBuffers()).isEqualTo(smallPoolSize);
+        assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isZero();
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isFalse();
+        buffers.add(bufferPool.requestMemorySegmentBlocking());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isEqualTo(2);
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // return all overdraft buffers.
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isFalse();
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isZero();
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // return the excess buffer.
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.isAvailable()).isFalse();
+        // return non-excess buffers.
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isTrue();
+
+        while (!buffers.isEmpty()) {
+            bufferPool.recycle(buffers.poll());
+        }
+        bufferPool.lazyDestroy();
+    }
+
+    @Test
+    void testIncreasePoolSize() throws Exception {

Review Comment:
   > We can test more cases for increasing pool size, especially some boundary conditions, such as largePoolSize = 7 and largePoolSize = 8. The 6 and 10 are not boundary values, and some corner bugs cannot be tested.
   
   Could you add `largePoolSize = 7 and largePoolSize = 8` in this test?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

Posted by "1996fanrui (via GitHub)" <gi...@apache.org>.
1996fanrui commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1127315078


##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java:
##########
@@ -242,23 +243,129 @@ public void testRecycleAfterDestroy() {
         localBufferPool.lazyDestroy();
 
         // All buffers have been requested, but can not be returned yet.
-        assertEquals(numBuffers, getNumRequestedFromMemorySegmentPool());
+        assertThat(getNumRequestedFromMemorySegmentPool()).isEqualTo(numBuffers);
 
         // Recycle should return buffers to memory segment pool
         for (Buffer buffer : requests) {
             buffer.recycleBuffer();
         }
     }
 
+    @Test
+    void testDecreasePoolSize() throws Exception {
+        LocalBufferPool bufferPool =
+                new LocalBufferPool(networkBufferPool, 4, 10, 0, Integer.MAX_VALUE, 2);
+        Queue<MemorySegment> buffers = new LinkedList<>();
+
+        // set pool size to 5.
+        bufferPool.setNumBuffers(5);
+        assertThat(bufferPool.getNumBuffers()).isEqualTo(5);
+
+        // request all buffer.
+        for (int i = 0; i < 5; i++) {
+            buffers.add(bufferPool.requestMemorySegmentBlocking());
+        }
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // request 1 overdraft buffers.
+        buffers.add(bufferPool.requestMemorySegmentBlocking());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // set pool size to 4.
+        bufferPool.setNumBuffers(4);
+        assertThat(bufferPool.getNumBuffers()).isEqualTo(4);
+        assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isZero();
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isFalse();
+        buffers.add(bufferPool.requestMemorySegmentBlocking());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isEqualTo(2);
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // return all overdraft buffers.
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isFalse();
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isZero();
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // return the excess buffer.
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.isAvailable()).isFalse();
+        // return non-excess buffers.
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isTrue();
+
+        while (!buffers.isEmpty()) {
+            bufferPool.recycle(buffers.poll());
+        }
+        bufferPool.lazyDestroy();
+    }
+
+    @Test
+    void testIncreasePoolSize() throws Exception {
+        LocalBufferPool bufferPool =
+                new LocalBufferPool(networkBufferPool, 5, 100, 0, Integer.MAX_VALUE, 2);
+        List<MemorySegment> buffers = new ArrayList<>();
+
+        // set pool size to 5.
+        bufferPool.setNumBuffers(5);

Review Comment:
   These magic number can be changed to the filed, such as, `oldPoolSize`, `newPoolSize`, `maxOverdraftSize`.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java:
##########
@@ -242,23 +243,129 @@ public void testRecycleAfterDestroy() {
         localBufferPool.lazyDestroy();
 
         // All buffers have been requested, but can not be returned yet.
-        assertEquals(numBuffers, getNumRequestedFromMemorySegmentPool());
+        assertThat(getNumRequestedFromMemorySegmentPool()).isEqualTo(numBuffers);
 
         // Recycle should return buffers to memory segment pool
         for (Buffer buffer : requests) {
             buffer.recycleBuffer();
         }
     }
 
+    @Test
+    void testDecreasePoolSize() throws Exception {
+        LocalBufferPool bufferPool =
+                new LocalBufferPool(networkBufferPool, 4, 10, 0, Integer.MAX_VALUE, 2);
+        Queue<MemorySegment> buffers = new LinkedList<>();
+
+        // set pool size to 5.
+        bufferPool.setNumBuffers(5);
+        assertThat(bufferPool.getNumBuffers()).isEqualTo(5);
+
+        // request all buffer.
+        for (int i = 0; i < 5; i++) {
+            buffers.add(bufferPool.requestMemorySegmentBlocking());
+        }
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // request 1 overdraft buffers.
+        buffers.add(bufferPool.requestMemorySegmentBlocking());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // set pool size to 4.
+        bufferPool.setNumBuffers(4);
+        assertThat(bufferPool.getNumBuffers()).isEqualTo(4);
+        assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isZero();
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isFalse();
+        buffers.add(bufferPool.requestMemorySegmentBlocking());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isEqualTo(2);
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // return all overdraft buffers.
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isFalse();
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isZero();
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // return the excess buffer.
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.isAvailable()).isFalse();
+        // return non-excess buffers.
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isTrue();
+
+        while (!buffers.isEmpty()) {
+            bufferPool.recycle(buffers.poll());
+        }
+        bufferPool.lazyDestroy();
+    }
+
+    @Test
+    void testIncreasePoolSize() throws Exception {
+        LocalBufferPool bufferPool =
+                new LocalBufferPool(networkBufferPool, 5, 100, 0, Integer.MAX_VALUE, 2);
+        List<MemorySegment> buffers = new ArrayList<>();
+
+        // set pool size to 5.
+        bufferPool.setNumBuffers(5);
+        assertThat(bufferPool.getNumBuffers()).isEqualTo(5);
+
+        // request all buffer.
+        for (int i = 0; i < 5; i++) {
+            buffers.add(bufferPool.requestMemorySegmentBlocking());
+        }
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // request 2 overdraft buffers.
+        buffers.add(bufferPool.requestMemorySegmentBlocking());
+        buffers.add(bufferPool.requestMemorySegmentBlocking());
+        assertThat(bufferPool.requestMemorySegment()).isNull();
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isEqualTo(2);
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // set pool size to 10.
+        bufferPool.setNumBuffers(10);
+        assertThat(bufferPool.getNumBuffers()).isEqualTo(10);
+        assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isOne();
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isEqualTo(2);
+        // available status will not be influenced by overdraft.
+        assertThat(bufferPool.isAvailable()).isTrue();
+        buffers.add(bufferPool.requestMemorySegmentBlocking());
+        assertThat(bufferPool.isAvailable()).isTrue();

Review Comment:
   There may be bug here.
   
   I change the 10 to 7, it should be unavailable, however, it's available. 
   
   `bufferPool.setNumBuffers(7);` requested new buffer from global buffer pool to the `availableMemorySegments`.
   
   Call stack is as follow :
   
   ```
   	  at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentFromGlobal(LocalBufferPool.java:442)
   	  at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.checkAvailability(LocalBufferPool.java:551)
   	  at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.checkAndUpdateAvailability(LocalBufferPool.java:524)
   	  at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:682)
   	  - locked <0xa5d> (a java.util.ArrayDeque)
   	  at org.apache.flink.runtime.io.network.buffer.LocalBufferPoolTest.testIncreasePoolSize(LocalBufferPoolTest.java:331)
   ```
   
   I think `isRequestedSizeReached` should be updated.
   
   
   ```
       private boolean isRequestedSizeReached() {
           return numberOfRequestedMemorySegments + numberOfRequestedOverdraftMemorySegments
                   >= currentPoolSize;
       }
   ```
   
   WDYT?
   
   BYW, we should add more tests here, such as : newPoolSize=7.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java:
##########
@@ -242,23 +243,129 @@ public void testRecycleAfterDestroy() {
         localBufferPool.lazyDestroy();
 
         // All buffers have been requested, but can not be returned yet.
-        assertEquals(numBuffers, getNumRequestedFromMemorySegmentPool());
+        assertThat(getNumRequestedFromMemorySegmentPool()).isEqualTo(numBuffers);
 
         // Recycle should return buffers to memory segment pool
         for (Buffer buffer : requests) {
             buffer.recycleBuffer();
         }
     }
 
+    @Test
+    void testDecreasePoolSize() throws Exception {
+        LocalBufferPool bufferPool =
+                new LocalBufferPool(networkBufferPool, 4, 10, 0, Integer.MAX_VALUE, 2);
+        Queue<MemorySegment> buffers = new LinkedList<>();
+
+        // set pool size to 5.
+        bufferPool.setNumBuffers(5);
+        assertThat(bufferPool.getNumBuffers()).isEqualTo(5);
+
+        // request all buffer.
+        for (int i = 0; i < 5; i++) {
+            buffers.add(bufferPool.requestMemorySegmentBlocking());
+        }
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // request 1 overdraft buffers.
+        buffers.add(bufferPool.requestMemorySegmentBlocking());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // set pool size to 4.
+        bufferPool.setNumBuffers(4);
+        assertThat(bufferPool.getNumBuffers()).isEqualTo(4);
+        assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isZero();
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isFalse();
+        buffers.add(bufferPool.requestMemorySegmentBlocking());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isEqualTo(2);
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // return all overdraft buffers.
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isFalse();
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isZero();
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // return the excess buffer.
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.isAvailable()).isFalse();
+        // return non-excess buffers.
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isTrue();
+
+        while (!buffers.isEmpty()) {
+            bufferPool.recycle(buffers.poll());
+        }
+        bufferPool.lazyDestroy();
+    }
+
+    @Test
+    void testIncreasePoolSize() throws Exception {
+        LocalBufferPool bufferPool =
+                new LocalBufferPool(networkBufferPool, 5, 100, 0, Integer.MAX_VALUE, 2);
+        List<MemorySegment> buffers = new ArrayList<>();
+
+        // set pool size to 5.
+        bufferPool.setNumBuffers(5);

Review Comment:
   testDecreasePoolSize is similar as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

Posted by "1996fanrui (via GitHub)" <gi...@apache.org>.
1996fanrui commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1145599539


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -503,12 +508,11 @@ private void onGlobalPoolAvailable() {
         mayNotifyAvailable(toNotify);
     }
 
+    @GuardedBy("availableMemorySegments")
     private boolean shouldBeAvailable() {
         assert Thread.holdsLock(availableMemorySegments);
 
-        return !availableMemorySegments.isEmpty()
-                && unavailableSubpartitionsCount == 0
-                && numberOfRequestedOverdraftMemorySegments == 0;
+        return !availableMemorySegments.isEmpty() && unavailableSubpartitionsCount == 0;

Review Comment:
   Hi @pnowojski @akalash , could you help take a look in your free time, thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1128937275


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -503,12 +508,11 @@ private void onGlobalPoolAvailable() {
         mayNotifyAvailable(toNotify);
     }
 
+    @GuardedBy("availableMemorySegments")
     private boolean shouldBeAvailable() {
         assert Thread.holdsLock(availableMemorySegments);
 
-        return !availableMemorySegments.isEmpty()
-                && unavailableSubpartitionsCount == 0
-                && numberOfRequestedOverdraftMemorySegments == 0;
+        return !availableMemorySegments.isEmpty() && unavailableSubpartitionsCount == 0;

Review Comment:
   Before we allows convert  `numberOfRequestedOverdraftMemorySegments` to `numberOfRequestedMemorySegments `, there still be a situation where both `availableMemorySegment` and `overdraft buffer` are all not zero, and this state should obviously be defined as available. But after that, I'm not sure whether this situation still exists. I'm a little worried about the potential bug of multithreading if we 
    add `numberOfRequestedOverdraftMemorySegments  == 0` back to `shouldBeAvailable`. 
   
   For me, the key point is that if we think "`The availableMemorySegments` is always empty when `numberOfRequestedOverdraftMemorySegments` != 0." is tenable, does `!availableMemorySegments.isEmpty()` already include `numberOfRequestedOverdraftMemorySegments == 0`? Even if this will not introduce bug, why should we impose useless constraints? 
   
   Actually, before overdraft buffer was introduced, the definition of `available` was very clear:
   There is at least one `availableMemorySegment` and no subpartitions has reached `maxBuffersPerChannel`.
   IMO, Introducing the overdraft mechanism should not break this protocol, overdraft buffer should not affect the judgment of availability.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1129012456


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -503,12 +508,11 @@ private void onGlobalPoolAvailable() {
         mayNotifyAvailable(toNotify);
     }
 
+    @GuardedBy("availableMemorySegments")
     private boolean shouldBeAvailable() {
         assert Thread.holdsLock(availableMemorySegments);
 
-        return !availableMemorySegments.isEmpty()
-                && unavailableSubpartitionsCount == 0
-                && numberOfRequestedOverdraftMemorySegments == 0;
+        return !availableMemorySegments.isEmpty() && unavailableSubpartitionsCount == 0;

Review Comment:
   > Before we allows convert numberOfRequestedOverdraftMemorySegments to numberOfRequestedMemorySegments , there still be a situation where both availableMemorySegment and overdraft buffer are all not zero.
   
   Just as what I said, only after we supports convert `numberOfRequestedOverdraftMemorySegments` to `numberOfRequestedMemorySegments`, then can we avoid having both `available buffers` and `overdraft buffers`. 
   
   It is not enough to just modify the following code as there are still concurrency problems.
   ```
               if (!availableMemorySegments.isEmpty()) {
                   segment = availableMemorySegments.poll();
               } else if (isRequestedSizeReached()) {
                   // Only when the buffer request reaches the upper limit(i.e. current pool size),
                   // requests an overdraft buffer.
                   segment = requestOverdraftMemorySegmentFromGlobal();
               }
   ```
   
   Of course, we have allowed this conversion now, but I'm not very sure whether there are other concurrency problems even though after this fix.
   
   I admit that we should not modify the current design for possible future bugs. But there are some subtleties here : I somehow think that this is a redundant judgment condition, and removing it can avoid bugs at the same time.
   
   In any case, your reason is also make sense and acceptable to me. But for the sake of safety, perhaps we should listen to the opinions of @wsry who knows more about the implementation of `LocalBufferPool` than me.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1129012456


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -503,12 +508,11 @@ private void onGlobalPoolAvailable() {
         mayNotifyAvailable(toNotify);
     }
 
+    @GuardedBy("availableMemorySegments")
     private boolean shouldBeAvailable() {
         assert Thread.holdsLock(availableMemorySegments);
 
-        return !availableMemorySegments.isEmpty()
-                && unavailableSubpartitionsCount == 0
-                && numberOfRequestedOverdraftMemorySegments == 0;
+        return !availableMemorySegments.isEmpty() && unavailableSubpartitionsCount == 0;

Review Comment:
   > Before we allows convert numberOfRequestedOverdraftMemorySegments to numberOfRequestedMemorySegments , there still be a situation where both availableMemorySegment and overdraft buffer are all not zero.
   
   Just as what I said, only after we supports convert `numberOfRequestedOverdraftMemorySegments` to `numberOfRequestedMemorySegments`, then can we avoid having both `available buffers` and `overdraft buffers`. 
   It is not enough to just modify the following code as there are still concurrency problems.
   ```
               if (!availableMemorySegments.isEmpty()) {
                   segment = availableMemorySegments.poll();
               } else if (isRequestedSizeReached()) {
                   // Only when the buffer request reaches the upper limit(i.e. current pool size),
                   // requests an overdraft buffer.
                   segment = requestOverdraftMemorySegmentFromGlobal();
               }
   ```
   Of course, we have allowed this conversion now, but I'm not very sure whether there are other concurrency problems even though after this fix.
   In any case, your reason is also make sense and acceptable to me. But for the sake of safety, perhaps we should listen to the opinions of @wsry who knows more about the implementation of `LocalBufferPool` than me.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on PR #22084:
URL: https://github.com/apache/flink/pull/22084#issuecomment-1494201748

   Squash the two last commits. Merged % CI green.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1127487679


##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java:
##########
@@ -242,23 +243,129 @@ public void testRecycleAfterDestroy() {
         localBufferPool.lazyDestroy();
 
         // All buffers have been requested, but can not be returned yet.
-        assertEquals(numBuffers, getNumRequestedFromMemorySegmentPool());
+        assertThat(getNumRequestedFromMemorySegmentPool()).isEqualTo(numBuffers);
 
         // Recycle should return buffers to memory segment pool
         for (Buffer buffer : requests) {
             buffer.recycleBuffer();
         }
     }
 
+    @Test
+    void testDecreasePoolSize() throws Exception {
+        LocalBufferPool bufferPool =
+                new LocalBufferPool(networkBufferPool, 4, 10, 0, Integer.MAX_VALUE, 2);
+        Queue<MemorySegment> buffers = new LinkedList<>();
+
+        // set pool size to 5.
+        bufferPool.setNumBuffers(5);
+        assertThat(bufferPool.getNumBuffers()).isEqualTo(5);
+
+        // request all buffer.
+        for (int i = 0; i < 5; i++) {
+            buffers.add(bufferPool.requestMemorySegmentBlocking());
+        }
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // request 1 overdraft buffers.
+        buffers.add(bufferPool.requestMemorySegmentBlocking());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // set pool size to 4.
+        bufferPool.setNumBuffers(4);
+        assertThat(bufferPool.getNumBuffers()).isEqualTo(4);
+        assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isZero();
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isFalse();
+        buffers.add(bufferPool.requestMemorySegmentBlocking());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isEqualTo(2);
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // return all overdraft buffers.
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isFalse();
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isZero();
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // return the excess buffer.
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.isAvailable()).isFalse();
+        // return non-excess buffers.
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isTrue();
+
+        while (!buffers.isEmpty()) {
+            bufferPool.recycle(buffers.poll());
+        }
+        bufferPool.lazyDestroy();
+    }
+
+    @Test
+    void testIncreasePoolSize() throws Exception {
+        LocalBufferPool bufferPool =
+                new LocalBufferPool(networkBufferPool, 5, 100, 0, Integer.MAX_VALUE, 2);
+        List<MemorySegment> buffers = new ArrayList<>();
+
+        // set pool size to 5.
+        bufferPool.setNumBuffers(5);

Review Comment:
   Good suggestion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

Posted by "1996fanrui (via GitHub)" <gi...@apache.org>.
1996fanrui commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1127349541


##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java:
##########
@@ -242,23 +243,129 @@ public void testRecycleAfterDestroy() {
         localBufferPool.lazyDestroy();
 
         // All buffers have been requested, but can not be returned yet.
-        assertEquals(numBuffers, getNumRequestedFromMemorySegmentPool());
+        assertThat(getNumRequestedFromMemorySegmentPool()).isEqualTo(numBuffers);
 
         // Recycle should return buffers to memory segment pool
         for (Buffer buffer : requests) {
             buffer.recycleBuffer();
         }
     }
 
+    @Test
+    void testDecreasePoolSize() throws Exception {
+        LocalBufferPool bufferPool =
+                new LocalBufferPool(networkBufferPool, 4, 10, 0, Integer.MAX_VALUE, 2);
+        Queue<MemorySegment> buffers = new LinkedList<>();
+
+        // set pool size to 5.
+        bufferPool.setNumBuffers(5);
+        assertThat(bufferPool.getNumBuffers()).isEqualTo(5);
+
+        // request all buffer.
+        for (int i = 0; i < 5; i++) {
+            buffers.add(bufferPool.requestMemorySegmentBlocking());
+        }
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // request 1 overdraft buffers.
+        buffers.add(bufferPool.requestMemorySegmentBlocking());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // set pool size to 4.
+        bufferPool.setNumBuffers(4);
+        assertThat(bufferPool.getNumBuffers()).isEqualTo(4);
+        assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isZero();
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isFalse();
+        buffers.add(bufferPool.requestMemorySegmentBlocking());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isEqualTo(2);
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // return all overdraft buffers.
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isFalse();
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isZero();
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // return the excess buffer.
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.isAvailable()).isFalse();
+        // return non-excess buffers.
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isTrue();
+
+        while (!buffers.isEmpty()) {
+            bufferPool.recycle(buffers.poll());
+        }
+        bufferPool.lazyDestroy();
+    }
+
+    @Test
+    void testIncreasePoolSize() throws Exception {
+        LocalBufferPool bufferPool =
+                new LocalBufferPool(networkBufferPool, 5, 100, 0, Integer.MAX_VALUE, 2);
+        List<MemorySegment> buffers = new ArrayList<>();
+
+        // set pool size to 5.
+        bufferPool.setNumBuffers(5);
+        assertThat(bufferPool.getNumBuffers()).isEqualTo(5);
+
+        // request all buffer.
+        for (int i = 0; i < 5; i++) {
+            buffers.add(bufferPool.requestMemorySegmentBlocking());
+        }
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // request 2 overdraft buffers.
+        buffers.add(bufferPool.requestMemorySegmentBlocking());
+        buffers.add(bufferPool.requestMemorySegmentBlocking());
+        assertThat(bufferPool.requestMemorySegment()).isNull();
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isEqualTo(2);
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // set pool size to 10.
+        bufferPool.setNumBuffers(10);
+        assertThat(bufferPool.getNumBuffers()).isEqualTo(10);
+        assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isOne();
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isEqualTo(2);
+        // available status will not be influenced by overdraft.
+        assertThat(bufferPool.isAvailable()).isTrue();
+        buffers.add(bufferPool.requestMemorySegmentBlocking());
+        assertThat(bufferPool.isAvailable()).isTrue();

Review Comment:
   Or, should we reset the `numberOfRequestedMemorySegments` and `numberOfRequestedOverdraftMemorySegments` inside of the `setNumBuffers`?
   
   If `currentPoolSize` is increased, we should decrease the `numberOfRequestedOverdraftMemorySegments` and increase `numberOfRequestedMemorySegments`.
   
   I'm not sure which solution is better. And I prefer reset them inside of  the `setNumBuffers`, because it's more clear, and other solutions may have bug when `returnMemorySegment`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1129012456


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -503,12 +508,11 @@ private void onGlobalPoolAvailable() {
         mayNotifyAvailable(toNotify);
     }
 
+    @GuardedBy("availableMemorySegments")
     private boolean shouldBeAvailable() {
         assert Thread.holdsLock(availableMemorySegments);
 
-        return !availableMemorySegments.isEmpty()
-                && unavailableSubpartitionsCount == 0
-                && numberOfRequestedOverdraftMemorySegments == 0;
+        return !availableMemorySegments.isEmpty() && unavailableSubpartitionsCount == 0;

Review Comment:
   > Before we allows convert numberOfRequestedOverdraftMemorySegments to numberOfRequestedMemorySegments , there still be a situation where both availableMemorySegment and overdraft buffer are all not zero.
   
   Just as what I said, only after we supports convert `numberOfRequestedOverdraftMemorySegments` to `numberOfRequestedMemorySegments`, then can we avoid having both `available buffers` and `overdraft buffers`. 
   
   It is not enough to just modify the following code as there are still concurrency problems.
   ```
               if (!availableMemorySegments.isEmpty()) {
                   segment = availableMemorySegments.poll();
               } else if (isRequestedSizeReached()) {
                   // Only when the buffer request reaches the upper limit(i.e. current pool size),
                   // requests an overdraft buffer.
                   segment = requestOverdraftMemorySegmentFromGlobal();
               }
   ```
   
   Of course, we have allowed this conversion now, but I'm not very sure whether there are other concurrency problems even though after this fix.
   
   I admit that we should not modify the current design for possible future bugs. But there are some subtleties here : I somehow think that this is a redundant judgment condition, and removing it can avoid bugs at the same time.
   
   In any case, your reason is also make sense and acceptable to me. Therefore, I will not strongly adhere to my opinion. But for the sake of safety, perhaps we should listen to the opinions of @wsry who knows more about the implementation of `LocalBufferPool` than me.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

Posted by "1996fanrui (via GitHub)" <gi...@apache.org>.
1996fanrui commented on PR #22084:
URL: https://github.com/apache/flink/pull/22084#issuecomment-1494136309

   > FYI:If there are no other concerns, I will merge this PR tomorrow.
   
   Thanks for your work, it's ok from my side.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa closed pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa closed pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached
URL: https://github.com/apache/flink/pull/22084


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

Posted by "1996fanrui (via GitHub)" <gi...@apache.org>.
1996fanrui commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1128996203


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -503,12 +508,11 @@ private void onGlobalPoolAvailable() {
         mayNotifyAvailable(toNotify);
     }
 
+    @GuardedBy("availableMemorySegments")
     private boolean shouldBeAvailable() {
         assert Thread.holdsLock(availableMemorySegments);
 
-        return !availableMemorySegments.isEmpty()
-                && unavailableSubpartitionsCount == 0
-                && numberOfRequestedOverdraftMemorySegments == 0;
+        return !availableMemorySegments.isEmpty() && unavailableSubpartitionsCount == 0;

Review Comment:
   > Actually, before overdraft buffer was introduced, the definition of available was very clear:
   There is at least one availableMemorySegment and no subpartitions has reached maxBuffersPerChannel.
   IMO, Introducing the overdraft mechanism should not break this protocol, overdraft buffer should not affect the judgment of availability.
   
   Sorry, overdraft buffer should affect the judgment of availability.
   
   As we discussed before: overdraft buffer is just used when requested buffer reached the upper limit(pool size). In the other word: _**overdraft buffer just be used after LocalBufferPool is unavailable.**_ 
   
   And why the name is `overdraft`? It temporarily uses some extra buffers outside the LocalBufferPool during it's unavailable. From the semantics of overdraft, if `numberOfRequestedOverdraftMemorySegments > 0`, then LocalBufferPool must be unavailable. That's why I add it here.
   
   You can take a look some backgrounds of the FLIP-227[1], its motivation is: when a task needs multiple network buffers to process a single record and the LocalBufferPool has no buffer (unavailable), it allows the task to overdraw some buffers to prevent the task from getting stuck.
   
   
   Why you want to remove it? I guess it has bug before, that is, the overdraft buffer is used when the `requested buffer` does not reach the upper limit, and you have fixed it in this PR.
   
   ```
               if (!availableMemorySegments.isEmpty()) {
                   segment = availableMemorySegments.poll();
               } else if (isRequestedSizeReached()) {
                   // Only when the buffer request reaches the upper limit(i.e. current pool size),
                   // requests an overdraft buffer.
                   segment = requestOverdraftMemorySegmentFromGlobal();
               }
   ```
   
   > For me, the key point is that if we think "The availableMemorySegments is always empty when numberOfRequestedOverdraftMemorySegments != 0." is tenable.
   
   If it's tenable now and future, we can remove the ` && numberOfRequestedOverdraftMemorySegments == 0` here, if not, it cannot be removed.
   
   Based on your feedback, I prefer keep it, because the root cause is: overdraft is misused in some cases, and you have fixed it.
   
   If there is other bug that misuse overdraft buffer, we should fix the bug to ensure overdraft buffer is used correctly instead of mark LocalBufferPool is available. Marking the LocalBufferPool is available directly may cause other unexpected bugs.
   
   [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1129012456


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -503,12 +508,11 @@ private void onGlobalPoolAvailable() {
         mayNotifyAvailable(toNotify);
     }
 
+    @GuardedBy("availableMemorySegments")
     private boolean shouldBeAvailable() {
         assert Thread.holdsLock(availableMemorySegments);
 
-        return !availableMemorySegments.isEmpty()
-                && unavailableSubpartitionsCount == 0
-                && numberOfRequestedOverdraftMemorySegments == 0;
+        return !availableMemorySegments.isEmpty() && unavailableSubpartitionsCount == 0;

Review Comment:
   > Before we allows convert numberOfRequestedOverdraftMemorySegments to numberOfRequestedMemorySegments , there still be a situation where both availableMemorySegment and overdraft buffer are all not zero.
   
   Just as what I said, only after we supports convert `numberOfRequestedOverdraftMemorySegments` to `numberOfRequestedMemorySegments`, then can we avoid having both `available buffers` and `overdraft buffers`. 
   It is not enough to just modify the following code as there are still concurrency problems.
   ```
               if (!availableMemorySegments.isEmpty()) {
                   segment = availableMemorySegments.poll();
               } else if (isRequestedSizeReached()) {
                   // Only when the buffer request reaches the upper limit(i.e. current pool size),
                   // requests an overdraft buffer.
                   segment = requestOverdraftMemorySegmentFromGlobal();
               }
   ```
   Of course, we have allowed this conversion now, but I'm not very sure whether there are other concurrency problems even though after this fix.
   I admit that we should not modify the current design for possible future bugs. But there are some subtleties here : I somehow think that this is a redundant judgment condition, and removing it can avoid bugs at the same time.
   In any case, your reason is also make sense and acceptable to me. But for the sake of safety, perhaps we should listen to the opinions of @wsry who knows more about the implementation of `LocalBufferPool` than me.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1129022590


##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java:
##########
@@ -242,23 +243,206 @@ public void testRecycleAfterDestroy() {
         localBufferPool.lazyDestroy();
 
         // All buffers have been requested, but can not be returned yet.
-        assertEquals(numBuffers, getNumRequestedFromMemorySegmentPool());
+        assertThat(getNumRequestedFromMemorySegmentPool()).isEqualTo(numBuffers);
 
         // Recycle should return buffers to memory segment pool
         for (Buffer buffer : requests) {
             buffer.recycleBuffer();
         }
     }
 
+    @Test
+    void testDecreasePoolSize() throws Exception {
+        final int maxMemorySegments = 10;
+        final int requiredMemorySegments = 4;
+        final int maxOverdraftBuffers = 2;
+        final int largePoolSize = 5;
+        final int smallPoolSize = 4;
+        LocalBufferPool bufferPool =
+                new LocalBufferPool(
+                        networkBufferPool,
+                        requiredMemorySegments,
+                        maxMemorySegments,
+                        0,
+                        Integer.MAX_VALUE,
+                        maxOverdraftBuffers);
+        Queue<MemorySegment> buffers = new LinkedList<>();
+
+        // set a larger pool size.
+        bufferPool.setNumBuffers(largePoolSize);
+        assertThat(bufferPool.getNumBuffers()).isEqualTo(largePoolSize);
+
+        // request all buffer.
+        for (int i = 0; i < largePoolSize; i++) {
+            buffers.add(bufferPool.requestMemorySegmentBlocking());
+        }
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // request 1 overdraft buffers.
+        buffers.add(bufferPool.requestMemorySegmentBlocking());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // set a small pool size.
+        bufferPool.setNumBuffers(smallPoolSize);
+        assertThat(bufferPool.getNumBuffers()).isEqualTo(smallPoolSize);
+        assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isZero();
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isFalse();
+        buffers.add(bufferPool.requestMemorySegmentBlocking());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isEqualTo(2);
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // return all overdraft buffers.
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isFalse();
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isZero();
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // return the excess buffer.
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.isAvailable()).isFalse();
+        // return non-excess buffers.
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isTrue();
+
+        while (!buffers.isEmpty()) {
+            bufferPool.recycle(buffers.poll());
+        }
+        bufferPool.lazyDestroy();
+    }
+
+    @Test
+    void testIncreasePoolSize() throws Exception {

Review Comment:
   Sure, I added a test for `largePoolSize = 7`.
   As for `largePoolSize=8`, I think it is no different from the current situation of `largePoolSize = 10`. It also exceeds the total number of buffers but does not reach `maxMemorySegments`. But I changed the previous test to `largePoolSize = 8`, and added test for the case of total number of buffers reached `maxMemorySegments`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1147076155


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -503,12 +508,11 @@ private void onGlobalPoolAvailable() {
         mayNotifyAvailable(toNotify);
     }
 
+    @GuardedBy("availableMemorySegments")
     private boolean shouldBeAvailable() {
         assert Thread.holdsLock(availableMemorySegments);
 
-        return !availableMemorySegments.isEmpty()
-                && unavailableSubpartitionsCount == 0
-                && numberOfRequestedOverdraftMemorySegments == 0;
+        return !availableMemorySegments.isEmpty() && unavailableSubpartitionsCount == 0;

Review Comment:
   @1996fanrui Sure, the reason why I use fix-up is to make it easier for everyone to see the changes I made last time. If there are no other issues, I'll squash it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

Posted by "1996fanrui (via GitHub)" <gi...@apache.org>.
1996fanrui commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1147074854


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -503,12 +508,11 @@ private void onGlobalPoolAvailable() {
         mayNotifyAvailable(toNotify);
     }
 
+    @GuardedBy("availableMemorySegments")
     private boolean shouldBeAvailable() {
         assert Thread.holdsLock(availableMemorySegments);
 
-        return !availableMemorySegments.isEmpty()
-                && unavailableSubpartitionsCount == 0
-                && numberOfRequestedOverdraftMemorySegments == 0;
+        return !availableMemorySegments.isEmpty() && unavailableSubpartitionsCount == 0;

Review Comment:
   Hi @akalash @reswqa , thanks for your quick feedback. Sounds make sense.
   
   @reswqa could you squash the last commit to the [FLINK-31293] commit?
   



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -503,12 +508,11 @@ private void onGlobalPoolAvailable() {
         mayNotifyAvailable(toNotify);
     }
 
+    @GuardedBy("availableMemorySegments")
     private boolean shouldBeAvailable() {
         assert Thread.holdsLock(availableMemorySegments);
 
-        return !availableMemorySegments.isEmpty()
-                && unavailableSubpartitionsCount == 0
-                && numberOfRequestedOverdraftMemorySegments == 0;
+        return !availableMemorySegments.isEmpty() && unavailableSubpartitionsCount == 0;

Review Comment:
   Hi @akalash @reswqa , thanks for your quick feedback. Sounds make sense.
   
   @reswqa could you squash the last fix commit to the [FLINK-31293] commit?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

Posted by "1996fanrui (via GitHub)" <gi...@apache.org>.
1996fanrui commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1128900085


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -503,12 +508,11 @@ private void onGlobalPoolAvailable() {
         mayNotifyAvailable(toNotify);
     }
 
+    @GuardedBy("availableMemorySegments")
     private boolean shouldBeAvailable() {
         assert Thread.holdsLock(availableMemorySegments);
 
-        return !availableMemorySegments.isEmpty()
-                && unavailableSubpartitionsCount == 0
-                && numberOfRequestedOverdraftMemorySegments == 0;
+        return !availableMemorySegments.isEmpty() && unavailableSubpartitionsCount == 0;

Review Comment:
   Why the `numberOfRequestedOverdraftMemorySegments  == 0` can be removed? I think there is a constraint here: The `availableMemorySegments` is always empty when `numberOfRequestedOverdraftMemorySegments > 0`. 
   
   Sometimes this constraint does not hold before this PR, and we want to hold the constraint in the future, right?
   
   If yes, could you add some comments for `numberOfRequestedOverdraftMemorySegments`? It's helpful for other developers to understand the constraint.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java:
##########
@@ -242,23 +243,206 @@ public void testRecycleAfterDestroy() {
         localBufferPool.lazyDestroy();
 
         // All buffers have been requested, but can not be returned yet.
-        assertEquals(numBuffers, getNumRequestedFromMemorySegmentPool());
+        assertThat(getNumRequestedFromMemorySegmentPool()).isEqualTo(numBuffers);
 
         // Recycle should return buffers to memory segment pool
         for (Buffer buffer : requests) {
             buffer.recycleBuffer();
         }
     }
 
+    @Test
+    void testDecreasePoolSize() throws Exception {
+        final int maxMemorySegments = 10;
+        final int requiredMemorySegments = 4;
+        final int maxOverdraftBuffers = 2;
+        final int largePoolSize = 5;
+        final int smallPoolSize = 4;
+        LocalBufferPool bufferPool =
+                new LocalBufferPool(
+                        networkBufferPool,
+                        requiredMemorySegments,
+                        maxMemorySegments,
+                        0,
+                        Integer.MAX_VALUE,
+                        maxOverdraftBuffers);
+        Queue<MemorySegment> buffers = new LinkedList<>();
+
+        // set a larger pool size.
+        bufferPool.setNumBuffers(largePoolSize);
+        assertThat(bufferPool.getNumBuffers()).isEqualTo(largePoolSize);
+
+        // request all buffer.
+        for (int i = 0; i < largePoolSize; i++) {
+            buffers.add(bufferPool.requestMemorySegmentBlocking());
+        }
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // request 1 overdraft buffers.
+        buffers.add(bufferPool.requestMemorySegmentBlocking());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // set a small pool size.
+        bufferPool.setNumBuffers(smallPoolSize);
+        assertThat(bufferPool.getNumBuffers()).isEqualTo(smallPoolSize);
+        assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isZero();
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isFalse();
+        buffers.add(bufferPool.requestMemorySegmentBlocking());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isEqualTo(2);
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // return all overdraft buffers.
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isFalse();
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isZero();
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // return the excess buffer.
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.isAvailable()).isFalse();
+        // return non-excess buffers.
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isTrue();
+
+        while (!buffers.isEmpty()) {
+            bufferPool.recycle(buffers.poll());
+        }
+        bufferPool.lazyDestroy();
+    }
+
+    @Test
+    void testIncreasePoolSize() throws Exception {

Review Comment:
   I prefer change the `testIncreasePoolSize` to the `testIncreasePoolSizeExceedTotalBuffers`. And the `testIncreasePoolSizeExceedTotalBuffers` and `testIncreasePoolSizeNotExceedTotalBuffers` as the normal method instead of test method. 
   
   And create a new `testIncreasePoolSize()` method as the test method, it calls `testIncreasePoolSizeExceedTotalBuffers` and `testIncreasePoolSizeNotExceedTotalBuffers`. We can define these parameters inside of the `testIncreasePoolSize()`, such as: `largePoolSize`, `smallPoolSize` and `maxOverdraftBuffers`.
   
   Why do I prefer this? We can test more cases for increasing pool size, especially some boundary conditions, such as `largePoolSize = 7` and `largePoolSize = 8`.  The 6 and 10 are not boundary values, and some corner bugs cannot be tested. 
   
   WDYT?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -669,6 +669,13 @@ public void setNumBuffers(int numBuffers) {
 
             currentPoolSize = Math.min(numBuffers, maxNumberOfMemorySegments);
 
+            // reset overdraft buffers
+            while (numberOfRequestedOverdraftMemorySegments > 0
+                    && numberOfRequestedMemorySegments < currentPoolSize) {

Review Comment:
   I see it can work now due to we return overdraft buffer first, right?
   
   However I think convert `numberOfRequestedMemorySegments` to `numberOfRequestedOverdraftMemorySegments ` is more clear, and the semantic of these fields are more explicit.
   
   I'm not sure which one is better. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on pull request #22084: [WIP][FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on PR #22084:
URL: https://github.com/apache/flink/pull/22084#issuecomment-1455518523

   Thanks @1996fanrui for the review, I have updated this as your comment, PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22084: [WIP][FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1125960686


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -396,7 +396,7 @@ private MemorySegment requestMemorySegment(int targetChannel) {
         synchronized (availableMemorySegments) {
             checkDestroyed();
 
-            if (availableMemorySegments.isEmpty()) {
+            if (availableMemorySegments.isEmpty() && isRequestedSizeReached()) {

Review Comment:
   IIUC, poll buffer from empty `availableMemorySegments ` is not necessary as we guarded this field by lock.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1128937275


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -503,12 +508,11 @@ private void onGlobalPoolAvailable() {
         mayNotifyAvailable(toNotify);
     }
 
+    @GuardedBy("availableMemorySegments")
     private boolean shouldBeAvailable() {
         assert Thread.holdsLock(availableMemorySegments);
 
-        return !availableMemorySegments.isEmpty()
-                && unavailableSubpartitionsCount == 0
-                && numberOfRequestedOverdraftMemorySegments == 0;
+        return !availableMemorySegments.isEmpty() && unavailableSubpartitionsCount == 0;

Review Comment:
   Before we allows convert  `numberOfRequestedOverdraftMemorySegments` to `numberOfRequestedMemorySegments `, there still be a situation where both `availableMemorySegment` and `overdraft buffer` are all not zero, and this state should obviously be defined as available. But after that, I'm not sure whether this situation still exists. I'm a little worried about the potential bug of multithreading if we 
    add `numberOfRequestedOverdraftMemorySegments  == 0` back to `shouldBeAvailable`. 
   
   For me, the key point is that if we think "`The availableMemorySegments` is always empty when `numberOfRequestedOverdraftMemorySegments != 0`." is tenable, does `!availableMemorySegments.isEmpty()` already include `numberOfRequestedOverdraftMemorySegments == 0`? Even if this will not introduce bug, why should we impose useless constraints? 
   
   Actually, before overdraft buffer was introduced, the definition of `available` was very clear:
   There is at least one `availableMemorySegment` and no subpartitions has reached `maxBuffersPerChannel`.
   IMO, Introducing the overdraft mechanism should not break this protocol, overdraft buffer should not affect the judgment of availability.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -503,12 +508,11 @@ private void onGlobalPoolAvailable() {
         mayNotifyAvailable(toNotify);
     }
 
+    @GuardedBy("availableMemorySegments")
     private boolean shouldBeAvailable() {
         assert Thread.holdsLock(availableMemorySegments);
 
-        return !availableMemorySegments.isEmpty()
-                && unavailableSubpartitionsCount == 0
-                && numberOfRequestedOverdraftMemorySegments == 0;
+        return !availableMemorySegments.isEmpty() && unavailableSubpartitionsCount == 0;

Review Comment:
   Before we allows convert  `numberOfRequestedOverdraftMemorySegments` to `numberOfRequestedMemorySegments `, there still be a situation where both `availableMemorySegment` and `overdraft buffer` are all not zero, and this state should obviously be defined as available. But after that, I'm not sure whether this situation still exists. I'm a little worried about the potential bug of multithreading if we 
    add `numberOfRequestedOverdraftMemorySegments  == 0` back to `shouldBeAvailable`. 
   
   For me, the key point is that if we think "The `availableMemorySegments` is always empty when `numberOfRequestedOverdraftMemorySegments != 0`." is tenable, does `!availableMemorySegments.isEmpty()` already include `numberOfRequestedOverdraftMemorySegments == 0`? Even if this will not introduce bug, why should we impose useless constraints? 
   
   Actually, before overdraft buffer was introduced, the definition of `available` was very clear:
   There is at least one `availableMemorySegment` and no subpartitions has reached `maxBuffersPerChannel`.
   IMO, Introducing the overdraft mechanism should not break this protocol, overdraft buffer should not affect the judgment of availability.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] akalash commented on pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

Posted by "akalash (via GitHub)" <gi...@apache.org>.
akalash commented on PR #22084:
URL: https://github.com/apache/flink/pull/22084#issuecomment-1494159310

   it is ok from me as well but doesn't forget to squash the two last commits


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1128926904


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -669,6 +669,13 @@ public void setNumBuffers(int numBuffers) {
 
             currentPoolSize = Math.min(numBuffers, maxNumberOfMemorySegments);
 
+            // reset overdraft buffers
+            while (numberOfRequestedOverdraftMemorySegments > 0
+                    && numberOfRequestedMemorySegments < currentPoolSize) {

Review Comment:
   The reason why I didn't do this convert here is that we have an upper limit on the number of overdraft buffers. If this conversion is allowed, and the `pool size` changes very small, the `numberOfRequestedOverdraftMemorySegments ` will exceed its upper limit.
   Of course, we can convert it to the upper limit at most, but this will make the logic a little complicated. It seems that there is no benefit to introduce this mechanism. WDTY?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1127499052


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -669,6 +669,13 @@ public void setNumBuffers(int numBuffers) {
 
             currentPoolSize = Math.min(numBuffers, maxNumberOfMemorySegments);
 
+            // reset overdraft buffers
+            while (numberOfRequestedOverdraftMemorySegments > 0
+                    && numberOfRequestedMemorySegments < currentPoolSize) {

Review Comment:
   For the case that the pool size becomes smaller, I prefer not to do special handling, because we already allow the case that `numRequestedBuffers` exceed the `pool size` after `setNumbers`. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on PR #22084:
URL: https://github.com/apache/flink/pull/22084#issuecomment-1457725482

   Thanks @1996fanrui for the review and catch the bug, I have updated this according to your comments, please take a look again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1128937275


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -503,12 +508,11 @@ private void onGlobalPoolAvailable() {
         mayNotifyAvailable(toNotify);
     }
 
+    @GuardedBy("availableMemorySegments")
     private boolean shouldBeAvailable() {
         assert Thread.holdsLock(availableMemorySegments);
 
-        return !availableMemorySegments.isEmpty()
-                && unavailableSubpartitionsCount == 0
-                && numberOfRequestedOverdraftMemorySegments == 0;
+        return !availableMemorySegments.isEmpty() && unavailableSubpartitionsCount == 0;

Review Comment:
   Before we allows convert  `numberOfRequestedOverdraftMemorySegments` to `numberOfRequestedMemorySegments `, there still be a situation where both `availableMemorySegment` and `overdraft buffer` are all not zero, and this state should obviously be defined as available. But after that, I'm not sure whether this situation still exists. I'm a little worried about the potential bug of multithreading if we 
    add `numberOfRequestedOverdraftMemorySegments  == 0` back to `shouldBeAvailable`. 
   For me, the key point is that if we think "`The availableMemorySegments` is always empty when `numberOfRequestedOverdraftMemorySegments` != 0." is tenable, does `!availableMemorySegments.isEmpty()` already include `numberOfRequestedOverdraftMemorySegments == 0`? Even if this will not introduce bug, why should we impose useless constraints? 
   Actually, before overdraft buffer was introduced, the definition of `available` was very clear:
   There is at least one `availableMemorySegment` and no subpartitions has reached `maxBuffersPerChannel`.
   IMO, Introducing the overdraft mechanism should not break this protocol, overdraft buffer should not affect the judgment of availability.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1146535330


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -503,12 +508,11 @@ private void onGlobalPoolAvailable() {
         mayNotifyAvailable(toNotify);
     }
 
+    @GuardedBy("availableMemorySegments")
     private boolean shouldBeAvailable() {
         assert Thread.holdsLock(availableMemorySegments);
 
-        return !availableMemorySegments.isEmpty()
-                && unavailableSubpartitionsCount == 0
-                && numberOfRequestedOverdraftMemorySegments == 0;
+        return !availableMemorySegments.isEmpty() && unavailableSubpartitionsCount == 0;

Review Comment:
   FYI, I agree that remove this condition from `shouldBeAvailable` does not affect the fix of this bug. In order not to block this ticket, I added this condition back.  I am also willing to help improve or refactor it, looking forward to a clearer and easier to understand `LocalBufferPool`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1146535330


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -503,12 +508,11 @@ private void onGlobalPoolAvailable() {
         mayNotifyAvailable(toNotify);
     }
 
+    @GuardedBy("availableMemorySegments")
     private boolean shouldBeAvailable() {
         assert Thread.holdsLock(availableMemorySegments);
 
-        return !availableMemorySegments.isEmpty()
-                && unavailableSubpartitionsCount == 0
-                && numberOfRequestedOverdraftMemorySegments == 0;
+        return !availableMemorySegments.isEmpty() && unavailableSubpartitionsCount == 0;

Review Comment:
   FYI, I agree that remove this condition from `shouldBeAvailable` does not affect the fix of this bug. In order not to block this ticket, I added this condition back in a fix-up commit.  I am also willing to help improve or refactor it, looking forward to a clearer and easier to understand `LocalBufferPool`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org