You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "1996fanrui (via GitHub)" <gi...@apache.org> on 2023/04/13 07:16:21 UTC

[GitHub] [flink] 1996fanrui commented on a diff in pull request #22381: [FLINK-31763][runtime] Convert requested buffers to overdraft buffers when pool size is decreased

1996fanrui commented on code in PR #22381:
URL: https://github.com/apache/flink/pull/22381#discussion_r1165094621


##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java:
##########
@@ -255,9 +253,38 @@ void testRecycleAfterDestroy() {
     void testDecreasePoolSize() throws Exception {
         final int maxMemorySegments = 10;
         final int requiredMemorySegments = 4;
-        final int maxOverdraftBuffers = 2;
-        final int largePoolSize = 5;
-        final int smallPoolSize = 4;
+
+        // requested buffers is equal to small pool size.
+        testDecreasePoolSizeInternal(
+                maxMemorySegments, requiredMemorySegments, 7, 5, 2, 5, 0, 5, 0);
+        // requested buffers is less than small pool size.
+        testDecreasePoolSizeInternal(
+                maxMemorySegments, requiredMemorySegments, 6, 4, 2, 2, 0, 3, 1);
+        // exceed buffers is equal to maxOverdraftBuffers
+        testDecreasePoolSizeInternal(
+                maxMemorySegments, requiredMemorySegments, 7, 5, 2, 7, 2, 5, 0);
+        // exceed buffers is greater than maxOverdraftBuffers
+        testDecreasePoolSizeInternal(
+                maxMemorySegments, requiredMemorySegments, 9, 5, 3, 9, 4, 5, 0);
+        // exceed buffers is less than maxOverdraftBuffers
+        testDecreasePoolSizeInternal(
+                maxMemorySegments, requiredMemorySegments, 7, 5, 4, 7, 2, 5, 0);
+        // decrease pool size with overdraft buffer.
+        testDecreasePoolSizeInternal(
+                maxMemorySegments, requiredMemorySegments, 7, 5, 6, 9, 4, 5, 0);

Review Comment:
   These 2 tests are testing the same case that `exceed buffers is less than maxOverdraftBuffers`, right? Could the last one be removed?
   
   



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -49,7 +49,11 @@
  *
  * <p>The size of this pool can be dynamically changed at runtime ({@link #setNumBuffers(int)}. It
  * will then lazily return the required number of buffers to the {@link NetworkBufferPool} to match
- * its new size.
+ * its new size. New buffers can be requested only when {@code numberOfRequestedMemorySegments +
+ * numberOfRequestedOverdraftMemorySegments < currentPoolSize + maxOverdraftBuffersPerGate}. In

Review Comment:
   Yes, we cannot return these buffers when task is using them .



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java:
##########
@@ -255,9 +253,38 @@ void testRecycleAfterDestroy() {
     void testDecreasePoolSize() throws Exception {
         final int maxMemorySegments = 10;
         final int requiredMemorySegments = 4;
-        final int maxOverdraftBuffers = 2;
-        final int largePoolSize = 5;
-        final int smallPoolSize = 4;
+
+        // requested buffers is equal to small pool size.
+        testDecreasePoolSizeInternal(
+                maxMemorySegments, requiredMemorySegments, 7, 5, 2, 5, 0, 5, 0);
+        // requested buffers is less than small pool size.
+        testDecreasePoolSizeInternal(
+                maxMemorySegments, requiredMemorySegments, 6, 4, 2, 2, 0, 3, 1);
+        // exceed buffers is equal to maxOverdraftBuffers
+        testDecreasePoolSizeInternal(
+                maxMemorySegments, requiredMemorySegments, 7, 5, 2, 7, 2, 5, 0);
+        // exceed buffers is greater than maxOverdraftBuffers
+        testDecreasePoolSizeInternal(
+                maxMemorySegments, requiredMemorySegments, 9, 5, 3, 9, 4, 5, 0);
+        // exceed buffers is less than maxOverdraftBuffers
+        testDecreasePoolSizeInternal(
+                maxMemorySegments, requiredMemorySegments, 7, 5, 4, 7, 2, 5, 0);
+        // decrease pool size with overdraft buffer.
+        testDecreasePoolSizeInternal(
+                maxMemorySegments, requiredMemorySegments, 7, 5, 6, 9, 4, 5, 0);
+    }
+
+    void testDecreasePoolSizeInternal(
+            int maxMemorySegments,
+            int requiredMemorySegments,
+            int largePoolSize,
+            int smallPoolSize,
+            int maxOverdraftBuffers,
+            int numBuffersToRequest,
+            int numOverdraftBuffersAfterDecreasePoolSize,
+            int numRequestedBuffersAfterDecreasePoolSize,

Review Comment:
   Field names may need to be renamed, they are not very clear. Especially, the `numRequestedBuffersAfterDecreasePoolSize`, I thought it's the total number of buffers requested(ordinary + overdraft) by the client from the LocalBufferPool.
   
   How about rename them to `numRequestedOverdraftBuffersAfterDecreasing` and `numRequestedOrdinaryBuffersAfterDecreasing`? The `poolSize` is removed due to the method name has included the `DecreasePoolSize` and the filed name is too long.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -766,13 +776,12 @@ private void returnExcessMemorySegments() {
 
     @GuardedBy("availableMemorySegments")
     private boolean hasExcessBuffers() {
-        return numberOfRequestedOverdraftMemorySegments > 0
-                || numberOfRequestedMemorySegments > currentPoolSize;
+        return numberOfRequestedOverdraftMemorySegments > 0;

Review Comment:
   I have checked, and it's ok from my side. However, I prefer inviting more experts to review this PR, it will be more reliable.
   
   Hi @pnowojski @akalash , would you mind take a look this PR in your free time? thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org