You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "reswqa (via GitHub)" <gi...@apache.org> on 2023/04/11 17:52:52 UTC

[GitHub] [flink] reswqa opened a new pull request, #22381: [FLINK-31763][runtime] Ensure that the total number of requested buffers does not exceed poolSize + maxOverdraftBuffersPerGate

reswqa opened a new pull request, #22381:
URL: https://github.com/apache/flink/pull/22381

   ## What is the purpose of the change
   
   *As we discussed in [FLINK-31610](https://issues.apache.org/jira/browse/FLINK-31610), new buffers can be requested only when `numOfRequestedMemorySegments + numberOfRequestedOverdraftMemorySegments < poolSize + maxOverdraftBuffersPerGate`.*
   
   *Consider such a scenario, the `CurrentPoolSize = 5`, `numOfRequestedMemorySegments = 7`, `maxOverdraftBuffersPerGate = 2`. If `numberOfRequestedOverdraftMemorySegments = 0`, then `2` buffers can be requested now.*
   
   *We should convert `numberOfRequestedMemorySegments` to `numberOfRequestedOverdraftMemorySegments` when poolSize is decreased.*
   
   
   ## Brief change log
   
     - *Convert `numberOfRequestedMemorySegments` to `numberOfRequestedOverdraftMemorySegments` when poolSize is decreased.*
   
   
   ## Verifying this change
   
   Add unit test in `LocalBufferPoolTest`.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): yes(but per-buffer)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on pull request #22381: [FLINK-31763][runtime] Convert requested buffers to overdraft buffers when pool size is decreased

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on PR #22381:
URL: https://github.com/apache/flink/pull/22381#issuecomment-1506298316

   Thanks @1996fanrui for the quick reply, I have updated this, PTAL~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22381: [FLINK-31763][runtime] Convert requested buffers to overdraft buffers when pool size is decreased

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22381:
URL: https://github.com/apache/flink/pull/22381#discussion_r1164959231


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -766,13 +776,12 @@ private void returnExcessMemorySegments() {
 
     @GuardedBy("availableMemorySegments")
     private boolean hasExcessBuffers() {
-        return numberOfRequestedOverdraftMemorySegments > 0
-                || numberOfRequestedMemorySegments > currentPoolSize;
+        return numberOfRequestedOverdraftMemorySegments > 0;

Review Comment:
   IIUC, If we allow all buffers that exceed poolSize to be `overdraft`, then `numberOfRequestedMemorySegments > currentPoolSize` will no happen, right? @1996fanrui can you help me to double confirm this, thx~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #22381: [FLINK-31763][runtime] Convert requested buffers to overdraft buffers when pool size is decreased

Posted by "1996fanrui (via GitHub)" <gi...@apache.org>.
1996fanrui commented on code in PR #22381:
URL: https://github.com/apache/flink/pull/22381#discussion_r1165094621


##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java:
##########
@@ -255,9 +253,38 @@ void testRecycleAfterDestroy() {
     void testDecreasePoolSize() throws Exception {
         final int maxMemorySegments = 10;
         final int requiredMemorySegments = 4;
-        final int maxOverdraftBuffers = 2;
-        final int largePoolSize = 5;
-        final int smallPoolSize = 4;
+
+        // requested buffers is equal to small pool size.
+        testDecreasePoolSizeInternal(
+                maxMemorySegments, requiredMemorySegments, 7, 5, 2, 5, 0, 5, 0);
+        // requested buffers is less than small pool size.
+        testDecreasePoolSizeInternal(
+                maxMemorySegments, requiredMemorySegments, 6, 4, 2, 2, 0, 3, 1);
+        // exceed buffers is equal to maxOverdraftBuffers
+        testDecreasePoolSizeInternal(
+                maxMemorySegments, requiredMemorySegments, 7, 5, 2, 7, 2, 5, 0);
+        // exceed buffers is greater than maxOverdraftBuffers
+        testDecreasePoolSizeInternal(
+                maxMemorySegments, requiredMemorySegments, 9, 5, 3, 9, 4, 5, 0);
+        // exceed buffers is less than maxOverdraftBuffers
+        testDecreasePoolSizeInternal(
+                maxMemorySegments, requiredMemorySegments, 7, 5, 4, 7, 2, 5, 0);
+        // decrease pool size with overdraft buffer.
+        testDecreasePoolSizeInternal(
+                maxMemorySegments, requiredMemorySegments, 7, 5, 6, 9, 4, 5, 0);

Review Comment:
   These 2 tests are testing the same case that `exceed buffers is less than maxOverdraftBuffers`, right? Could the last one be removed?
   
   



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -49,7 +49,11 @@
  *
  * <p>The size of this pool can be dynamically changed at runtime ({@link #setNumBuffers(int)}. It
  * will then lazily return the required number of buffers to the {@link NetworkBufferPool} to match
- * its new size.
+ * its new size. New buffers can be requested only when {@code numberOfRequestedMemorySegments +
+ * numberOfRequestedOverdraftMemorySegments < currentPoolSize + maxOverdraftBuffersPerGate}. In

Review Comment:
   Yes, we cannot return these buffers when task is using them .



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java:
##########
@@ -255,9 +253,38 @@ void testRecycleAfterDestroy() {
     void testDecreasePoolSize() throws Exception {
         final int maxMemorySegments = 10;
         final int requiredMemorySegments = 4;
-        final int maxOverdraftBuffers = 2;
-        final int largePoolSize = 5;
-        final int smallPoolSize = 4;
+
+        // requested buffers is equal to small pool size.
+        testDecreasePoolSizeInternal(
+                maxMemorySegments, requiredMemorySegments, 7, 5, 2, 5, 0, 5, 0);
+        // requested buffers is less than small pool size.
+        testDecreasePoolSizeInternal(
+                maxMemorySegments, requiredMemorySegments, 6, 4, 2, 2, 0, 3, 1);
+        // exceed buffers is equal to maxOverdraftBuffers
+        testDecreasePoolSizeInternal(
+                maxMemorySegments, requiredMemorySegments, 7, 5, 2, 7, 2, 5, 0);
+        // exceed buffers is greater than maxOverdraftBuffers
+        testDecreasePoolSizeInternal(
+                maxMemorySegments, requiredMemorySegments, 9, 5, 3, 9, 4, 5, 0);
+        // exceed buffers is less than maxOverdraftBuffers
+        testDecreasePoolSizeInternal(
+                maxMemorySegments, requiredMemorySegments, 7, 5, 4, 7, 2, 5, 0);
+        // decrease pool size with overdraft buffer.
+        testDecreasePoolSizeInternal(
+                maxMemorySegments, requiredMemorySegments, 7, 5, 6, 9, 4, 5, 0);
+    }
+
+    void testDecreasePoolSizeInternal(
+            int maxMemorySegments,
+            int requiredMemorySegments,
+            int largePoolSize,
+            int smallPoolSize,
+            int maxOverdraftBuffers,
+            int numBuffersToRequest,
+            int numOverdraftBuffersAfterDecreasePoolSize,
+            int numRequestedBuffersAfterDecreasePoolSize,

Review Comment:
   Field names may need to be renamed, they are not very clear. Especially, the `numRequestedBuffersAfterDecreasePoolSize`, I thought it's the total number of buffers requested(ordinary + overdraft) by the client from the LocalBufferPool.
   
   How about rename them to `numRequestedOverdraftBuffersAfterDecreasing` and `numRequestedOrdinaryBuffersAfterDecreasing`? The `poolSize` is removed due to the method name has included the `DecreasePoolSize` and the filed name is too long.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -766,13 +776,12 @@ private void returnExcessMemorySegments() {
 
     @GuardedBy("availableMemorySegments")
     private boolean hasExcessBuffers() {
-        return numberOfRequestedOverdraftMemorySegments > 0
-                || numberOfRequestedMemorySegments > currentPoolSize;
+        return numberOfRequestedOverdraftMemorySegments > 0;

Review Comment:
   I have checked, and it's ok from my side. However, I prefer inviting more experts to review this PR, it will be more reliable.
   
   Hi @pnowojski @akalash , would you mind take a look this PR in your free time? thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] akalash commented on a diff in pull request #22381: [FLINK-31763][runtime] Convert requested buffers to overdraft buffers when pool size is decreased

Posted by "akalash (via GitHub)" <gi...@apache.org>.
akalash commented on code in PR #22381:
URL: https://github.com/apache/flink/pull/22381#discussion_r1166841760


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -766,13 +776,12 @@ private void returnExcessMemorySegments() {
 
     @GuardedBy("availableMemorySegments")
     private boolean hasExcessBuffers() {
-        return numberOfRequestedOverdraftMemorySegments > 0
-                || numberOfRequestedMemorySegments > currentPoolSize;
+        return numberOfRequestedOverdraftMemorySegments > 0;

Review Comment:
   Technically, this change is correct but since `numberOfRequestedOverdraftMemorySegments` and `numberOfRequestedMemorySegments` can be changed independently I would prefer to leave it as is. And we can make this change as soon as we get rid of `numberOfRequestedOverdraftMemorySegments`(if we will do it at all).
   Maybe I am a little paranoic about this but the current contract between `numberOfRequestedOverdraftMemorySegments` and `numberOfRequestedMemorySegments` looks a little bit fragile so I am afraid that it is easy to introduce the bug when `numberOfRequestedOverdraftMemorySegments > 0 == false` while `umberOfRequestedMemorySegments > currentPoolSize == true`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -766,13 +776,12 @@ private void returnExcessMemorySegments() {
 
     @GuardedBy("availableMemorySegments")
     private boolean hasExcessBuffers() {
-        return numberOfRequestedOverdraftMemorySegments > 0
-                || numberOfRequestedMemorySegments > currentPoolSize;
+        return numberOfRequestedOverdraftMemorySegments > 0;
     }
 
     @GuardedBy("availableMemorySegments")
     private boolean isRequestedSizeReached() {
-        return numberOfRequestedMemorySegments >= currentPoolSize;
+        return numberOfRequestedMemorySegments == currentPoolSize;

Review Comment:
   I think you should not change this condition. Since, in my opinion, the rule of thumb for concurrent code looks something like this: "If the value physically can be greater than the border value we should also compare it with the border as >= even if logically it can not be greater". 
   In this case, we have the moment of time when `numberOfRequestedMemorySegments > currentPoolSize` it is under the synchronization but anyway.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22381: [FLINK-31763][runtime] Convert requested buffers to overdraft buffers when pool size is decreased

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22381:
URL: https://github.com/apache/flink/pull/22381#discussion_r1164941748


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -49,7 +49,11 @@
  *
  * <p>The size of this pool can be dynamically changed at runtime ({@link #setNumBuffers(int)}. It
  * will then lazily return the required number of buffers to the {@link NetworkBufferPool} to match
- * its new size.
+ * its new size. New buffers can be requested only when {@code numberOfRequestedMemorySegments +
+ * numberOfRequestedOverdraftMemorySegments < currentPoolSize + maxOverdraftBuffersPerGate}. In

Review Comment:
   Yes, good catch! I was stupid when typing the commit message. 🤣 
   
   In fact, maybe we can never guarantee that `the total number of requested buffers(requested + overdraft) does not exceed poolSize + maxOverdraftBuffersPerGate`, because we will not force buffers to return when the pool size changes, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22381: [FLINK-31763][runtime] Convert requested buffers to overdraft buffers when pool size is decreased

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22381:
URL: https://github.com/apache/flink/pull/22381#discussion_r1165105388


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -766,13 +776,12 @@ private void returnExcessMemorySegments() {
 
     @GuardedBy("availableMemorySegments")
     private boolean hasExcessBuffers() {
-        return numberOfRequestedOverdraftMemorySegments > 0
-                || numberOfRequestedMemorySegments > currentPoolSize;
+        return numberOfRequestedOverdraftMemorySegments > 0;

Review Comment:
   > I prefer inviting more experts to review this PR, it will be more reliable.
   
   Sure, It will be better~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22381: [FLINK-31763][runtime] Convert requested buffers to overdraft buffers when pool size is decreased

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22381:
URL: https://github.com/apache/flink/pull/22381#discussion_r1166952794


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -766,13 +776,12 @@ private void returnExcessMemorySegments() {
 
     @GuardedBy("availableMemorySegments")
     private boolean hasExcessBuffers() {
-        return numberOfRequestedOverdraftMemorySegments > 0
-                || numberOfRequestedMemorySegments > currentPoolSize;
+        return numberOfRequestedOverdraftMemorySegments > 0;

Review Comment:
   > we can make this change as soon as we get rid of numberOfRequestedOverdraftMemorySegments(if we will do it at all).
   
   Yes, I'm sure we'll be removing `numberOfRequestedOverdraftMemorySegments` soon as keeping it would complicate the contract a bit.
   
   I agree that it's easy to introduce some hard-to-find bugs, especially since we'll be refactoring it soon. I will revert the change here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22381: [FLINK-31763][runtime] Convert requested buffers to overdraft buffers when pool size is decreased

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22381:
URL: https://github.com/apache/flink/pull/22381#discussion_r1164944912


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -671,13 +675,20 @@ public void setNumBuffers(int numBuffers) {
 
             currentPoolSize = Math.min(numBuffers, maxNumberOfMemorySegments);
 
-            // reset overdraft buffers
+            // If pool size increases, try to convert overdraft buffer to ordinary buffer.
             while (numberOfRequestedOverdraftMemorySegments > 0
                     && numberOfRequestedMemorySegments < currentPoolSize) {
                 numberOfRequestedOverdraftMemorySegments--;
                 numberOfRequestedMemorySegments++;
             }
 
+            // If pool size decreases, try to convert ordinary buffer to overdraft buffer.
+            while (numberOfRequestedMemorySegments > currentPoolSize
+                    && numberOfRequestedOverdraftMemorySegments < maxOverdraftBuffersPerGate) {
+                numberOfRequestedMemorySegments--;
+                numberOfRequestedOverdraftMemorySegments++;
+            }

Review Comment:
   > From the discussion of [FLINK-31610](https://issues.apache.org/jira/browse/FLINK-31610), this restriction(numberOfRequestedOverdraftMemorySegments <= maxOverdraftBuffersPerGate) will be broken in [FLINK-31764](https://issues.apache.org/jira/browse/FLINK-31764), right?
   
   Yep, you are absolutely right.
   
   > If so, I prefer [FLINK-31764](https://issues.apache.org/jira/browse/FLINK-31764) only refactor the code, not change the overdraft strategy or behavior, and we change the strategy in this PR, WDYT?
   
   Change the semantic/behavior in this PR also make sense to me. Let's update FLINK-31764 to only simply remove the redundant field.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #22381: [FLINK-31763][runtime] Ensure that the total number of requested buffers does not exceed poolSize + maxOverdraftBuffersPerGate

Posted by "flinkbot (via GitHub)" <gi...@apache.org>.
flinkbot commented on PR #22381:
URL: https://github.com/apache/flink/pull/22381#issuecomment-1503852904

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "d4ac5891e842a21920525a5ed5f0ccce5206718c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d4ac5891e842a21920525a5ed5f0ccce5206718c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d4ac5891e842a21920525a5ed5f0ccce5206718c UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22381: [FLINK-31763][runtime] Convert requested buffers to overdraft buffers when pool size is decreased

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22381:
URL: https://github.com/apache/flink/pull/22381#discussion_r1165111871


##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java:
##########
@@ -255,9 +253,38 @@ void testRecycleAfterDestroy() {
     void testDecreasePoolSize() throws Exception {
         final int maxMemorySegments = 10;
         final int requiredMemorySegments = 4;
-        final int maxOverdraftBuffers = 2;
-        final int largePoolSize = 5;
-        final int smallPoolSize = 4;
+
+        // requested buffers is equal to small pool size.
+        testDecreasePoolSizeInternal(
+                maxMemorySegments, requiredMemorySegments, 7, 5, 2, 5, 0, 5, 0);
+        // requested buffers is less than small pool size.
+        testDecreasePoolSizeInternal(
+                maxMemorySegments, requiredMemorySegments, 6, 4, 2, 2, 0, 3, 1);
+        // exceed buffers is equal to maxOverdraftBuffers
+        testDecreasePoolSizeInternal(
+                maxMemorySegments, requiredMemorySegments, 7, 5, 2, 7, 2, 5, 0);
+        // exceed buffers is greater than maxOverdraftBuffers
+        testDecreasePoolSizeInternal(
+                maxMemorySegments, requiredMemorySegments, 9, 5, 3, 9, 4, 5, 0);
+        // exceed buffers is less than maxOverdraftBuffers
+        testDecreasePoolSizeInternal(
+                maxMemorySegments, requiredMemorySegments, 7, 5, 4, 7, 2, 5, 0);
+        // decrease pool size with overdraft buffer.
+        testDecreasePoolSizeInternal(
+                maxMemorySegments, requiredMemorySegments, 7, 5, 6, 9, 4, 5, 0);

Review Comment:
   The difference between the two test case is that the latter one already holds the `overdraft buffer` before the `poolSize` is changed. IMO, it is better to cover this situation in the unit tests. Of course, there is no major problem in removing this case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22381: [FLINK-31763][runtime] Convert requested buffers to overdraft buffers when pool size is decreased

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22381:
URL: https://github.com/apache/flink/pull/22381#discussion_r1166961006


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -766,13 +776,12 @@ private void returnExcessMemorySegments() {
 
     @GuardedBy("availableMemorySegments")
     private boolean hasExcessBuffers() {
-        return numberOfRequestedOverdraftMemorySegments > 0
-                || numberOfRequestedMemorySegments > currentPoolSize;
+        return numberOfRequestedOverdraftMemorySegments > 0;
     }
 
     @GuardedBy("availableMemorySegments")
     private boolean isRequestedSizeReached() {
-        return numberOfRequestedMemorySegments >= currentPoolSize;
+        return numberOfRequestedMemorySegments == currentPoolSize;

Review Comment:
   > In my opinion, the rule of thumb for concurrent code looks something like this: "If the value physically can be greater than the border value we should also compare it with the border as >= even if logically it can not be greater".
   
   This rule really make sense to me and thanks for telling me about this! Maybe I was a little too radical before. After all, the world full of concurrency is not safe at all. 😞



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on pull request #22381: [FLINK-31763][runtime] Convert requested buffers to overdraft buffers when pool size is decreased

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on PR #22381:
URL: https://github.com/apache/flink/pull/22381#issuecomment-1506490926

   Thanks @1996fanrui for the second round of review, PR updated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22381: [FLINK-31763][runtime] Convert requested buffers to overdraft buffers when pool size is decreased

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22381:
URL: https://github.com/apache/flink/pull/22381#discussion_r1165117526


##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java:
##########
@@ -255,9 +253,38 @@ void testRecycleAfterDestroy() {
     void testDecreasePoolSize() throws Exception {
         final int maxMemorySegments = 10;
         final int requiredMemorySegments = 4;
-        final int maxOverdraftBuffers = 2;
-        final int largePoolSize = 5;
-        final int smallPoolSize = 4;
+
+        // requested buffers is equal to small pool size.
+        testDecreasePoolSizeInternal(
+                maxMemorySegments, requiredMemorySegments, 7, 5, 2, 5, 0, 5, 0);
+        // requested buffers is less than small pool size.
+        testDecreasePoolSizeInternal(
+                maxMemorySegments, requiredMemorySegments, 6, 4, 2, 2, 0, 3, 1);
+        // exceed buffers is equal to maxOverdraftBuffers
+        testDecreasePoolSizeInternal(
+                maxMemorySegments, requiredMemorySegments, 7, 5, 2, 7, 2, 5, 0);
+        // exceed buffers is greater than maxOverdraftBuffers
+        testDecreasePoolSizeInternal(
+                maxMemorySegments, requiredMemorySegments, 9, 5, 3, 9, 4, 5, 0);
+        // exceed buffers is less than maxOverdraftBuffers
+        testDecreasePoolSizeInternal(
+                maxMemorySegments, requiredMemorySegments, 7, 5, 4, 7, 2, 5, 0);
+        // decrease pool size with overdraft buffer.
+        testDecreasePoolSizeInternal(
+                maxMemorySegments, requiredMemorySegments, 7, 5, 6, 9, 4, 5, 0);
+    }
+
+    void testDecreasePoolSizeInternal(
+            int maxMemorySegments,
+            int requiredMemorySegments,
+            int largePoolSize,
+            int smallPoolSize,
+            int maxOverdraftBuffers,
+            int numBuffersToRequest,
+            int numOverdraftBuffersAfterDecreasePoolSize,
+            int numRequestedBuffersAfterDecreasePoolSize,

Review Comment:
   Good suggestions, I have renamed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa merged pull request #22381: [FLINK-31763][runtime] Convert requested buffers to overdraft buffers when pool size is decreased

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa merged PR #22381:
URL: https://github.com/apache/flink/pull/22381


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on pull request #22381: [FLINK-31763][runtime] Convert requested buffers to overdraft buffers when pool size is decreased

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on PR #22381:
URL: https://github.com/apache/flink/pull/22381#issuecomment-1508885180

   All commits have been squashed, merged % AZP green.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on pull request #22381: [FLINK-31763][runtime] Convert requested buffers to overdraft buffers when pool size is decreased

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on PR #22381:
URL: https://github.com/apache/flink/pull/22381#issuecomment-1508782160

   Thanks @akalash for the review! I have squash all previous commits and revert the changes in `hasExcessBuffers ` and `isRequestedSizeReached` in a fix-up commit, PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on pull request #22381: [FLINK-31763][runtime] Ensure that the total number of requested buffers does not exceed poolSize + maxOverdraftBuffersPerGate

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on PR #22381:
URL: https://github.com/apache/flink/pull/22381#issuecomment-1504989677

   @1996fanrui Would you mind taking a look at this in your free time? Thanks~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #22381: [FLINK-31763][runtime] Ensure that the total number of requested buffers does not exceed poolSize + maxOverdraftBuffersPerGate

Posted by "1996fanrui (via GitHub)" <gi...@apache.org>.
1996fanrui commented on code in PR #22381:
URL: https://github.com/apache/flink/pull/22381#discussion_r1163988392


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -671,13 +675,20 @@ public void setNumBuffers(int numBuffers) {
 
             currentPoolSize = Math.min(numBuffers, maxNumberOfMemorySegments);
 
-            // reset overdraft buffers
+            // If pool size increases, try to convert overdraft buffer to ordinary buffer.
             while (numberOfRequestedOverdraftMemorySegments > 0
                     && numberOfRequestedMemorySegments < currentPoolSize) {
                 numberOfRequestedOverdraftMemorySegments--;
                 numberOfRequestedMemorySegments++;
             }
 
+            // If pool size decreases, try to convert ordinary buffer to overdraft buffer.
+            while (numberOfRequestedMemorySegments > currentPoolSize
+                    && numberOfRequestedOverdraftMemorySegments < maxOverdraftBuffersPerGate) {
+                numberOfRequestedMemorySegments--;
+                numberOfRequestedOverdraftMemorySegments++;
+            }

Review Comment:
   Hi @reswqa , thanks for the fix.
   
   I'm curious, what's wrong with just leaving `numberOfRequestedMemorySegments > currentPoolSize`? Count extra buffers to the `overdraft buffer`.
   
   ```suggestion
               while (numberOfRequestedMemorySegments > currentPoolSize) {
                   numberOfRequestedMemorySegments--;
                   numberOfRequestedOverdraftMemorySegments++;
               }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #22381: [FLINK-31763][runtime] Ensure that the total number of requested buffers does not exceed poolSize + maxOverdraftBuffersPerGate

Posted by "1996fanrui (via GitHub)" <gi...@apache.org>.
1996fanrui commented on code in PR #22381:
URL: https://github.com/apache/flink/pull/22381#discussion_r1164914879


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -671,13 +675,20 @@ public void setNumBuffers(int numBuffers) {
 
             currentPoolSize = Math.min(numBuffers, maxNumberOfMemorySegments);
 
-            // reset overdraft buffers
+            // If pool size increases, try to convert overdraft buffer to ordinary buffer.
             while (numberOfRequestedOverdraftMemorySegments > 0
                     && numberOfRequestedMemorySegments < currentPoolSize) {
                 numberOfRequestedOverdraftMemorySegments--;
                 numberOfRequestedMemorySegments++;
             }
 
+            // If pool size decreases, try to convert ordinary buffer to overdraft buffer.
+            while (numberOfRequestedMemorySegments > currentPoolSize
+                    && numberOfRequestedOverdraftMemorySegments < maxOverdraftBuffersPerGate) {
+                numberOfRequestedMemorySegments--;
+                numberOfRequestedOverdraftMemorySegments++;
+            }

Review Comment:
   Thanks for the clarification.
   
   > When numberOfRequestedMemorySegments <= poolSize, all buffers are ordinary buffer.
   > When numberOfRequestedMemorySegments > poolSize, the `ordinary buffer size = poolSize`, and `the overdraft buffer size = numberOfRequestedMemorySegments - poolSize`
   
   From the discussion of FLINK-31610, this restriction(`numberOfRequestedOverdraftMemorySegments <= maxOverdraftBuffersPerGate`) will be broken in FLINK-31764, right?
   
   If so, I prefer FLINK-31764 only refactor the code, not change the overdraft strategy or behavior, and we change the strategy in this PR, WDYT?
   
   Please correct my if my understanding is wrong, thanks!



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -49,7 +49,11 @@
  *
  * <p>The size of this pool can be dynamically changed at runtime ({@link #setNumBuffers(int)}. It
  * will then lazily return the required number of buffers to the {@link NetworkBufferPool} to match
- * its new size.
+ * its new size. New buffers can be requested only when {@code numberOfRequestedMemorySegments +
+ * numberOfRequestedOverdraftMemorySegments < currentPoolSize + maxOverdraftBuffersPerGate}. In

Review Comment:
   This PR just ensure that new buffers cannot be requested when `the total number of requested buffers` exceeds `poolSize + maxOverdraftBuffersPerGate`, and cannot ensure that `the total number of requested buffers does not exceed poolSize + maxOverdraftBuffersPerGate`, right?
   
   If so, the commit message and JIRA title should be changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22381: [FLINK-31763][runtime] Ensure that the total number of requested buffers does not exceed poolSize + maxOverdraftBuffersPerGate

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22381:
URL: https://github.com/apache/flink/pull/22381#discussion_r1164030247


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -671,13 +675,20 @@ public void setNumBuffers(int numBuffers) {
 
             currentPoolSize = Math.min(numBuffers, maxNumberOfMemorySegments);
 
-            // reset overdraft buffers
+            // If pool size increases, try to convert overdraft buffer to ordinary buffer.
             while (numberOfRequestedOverdraftMemorySegments > 0
                     && numberOfRequestedMemorySegments < currentPoolSize) {
                 numberOfRequestedOverdraftMemorySegments--;
                 numberOfRequestedMemorySegments++;
             }
 
+            // If pool size decreases, try to convert ordinary buffer to overdraft buffer.
+            while (numberOfRequestedMemorySegments > currentPoolSize
+                    && numberOfRequestedOverdraftMemorySegments < maxOverdraftBuffersPerGate) {
+                numberOfRequestedMemorySegments--;
+                numberOfRequestedOverdraftMemorySegments++;
+            }

Review Comment:
   Thanks @1996fanrui for the quick reply.
   
   The reason for this restriction(`numberOfRequestedOverdraftMemorySegments < maxOverdraftBuffersPerGate`) is to ensure that the case where `numberOfRequestedOverdraftMemorySegments` is larger than `maxOverdraftBuffersPerGate` does not happen. If we consider all buffers exceeding poolSize as overdraft buffers, then `numberOfRequestedOverdraftMemorySegments` is actually a field that can be removed. Actually I created FLINK-31764 to do this. But for the current PR, I don't plan to do this refactoring, the changes here are just to ensure that the total number of requested buffers does not exceed `poolSize + maxOverdraftBuffersPerGate`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22381: [FLINK-31763][runtime] Ensure that the total number of requested buffers does not exceed poolSize + maxOverdraftBuffersPerGate

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22381:
URL: https://github.com/apache/flink/pull/22381#discussion_r1164941748


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -49,7 +49,11 @@
  *
  * <p>The size of this pool can be dynamically changed at runtime ({@link #setNumBuffers(int)}. It
  * will then lazily return the required number of buffers to the {@link NetworkBufferPool} to match
- * its new size.
+ * its new size. New buffers can be requested only when {@code numberOfRequestedMemorySegments +
+ * numberOfRequestedOverdraftMemorySegments < currentPoolSize + maxOverdraftBuffersPerGate}. In

Review Comment:
   Yes, good catch! I was stupid when typing the commit message. 🤣 
   
   In fact, maybe we can never guarantee that `the total number of requested buffers does not exceed poolSize + maxOverdraftBuffersPerGate`, because we will not force buffers to return when the pool size changes, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org