You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "1996fanrui (via GitHub)" <gi...@apache.org> on 2023/03/07 04:03:11 UTC

[GitHub] [flink] 1996fanrui commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

1996fanrui commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1127315078


##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java:
##########
@@ -242,23 +243,129 @@ public void testRecycleAfterDestroy() {
         localBufferPool.lazyDestroy();
 
         // All buffers have been requested, but can not be returned yet.
-        assertEquals(numBuffers, getNumRequestedFromMemorySegmentPool());
+        assertThat(getNumRequestedFromMemorySegmentPool()).isEqualTo(numBuffers);
 
         // Recycle should return buffers to memory segment pool
         for (Buffer buffer : requests) {
             buffer.recycleBuffer();
         }
     }
 
+    @Test
+    void testDecreasePoolSize() throws Exception {
+        LocalBufferPool bufferPool =
+                new LocalBufferPool(networkBufferPool, 4, 10, 0, Integer.MAX_VALUE, 2);
+        Queue<MemorySegment> buffers = new LinkedList<>();
+
+        // set pool size to 5.
+        bufferPool.setNumBuffers(5);
+        assertThat(bufferPool.getNumBuffers()).isEqualTo(5);
+
+        // request all buffer.
+        for (int i = 0; i < 5; i++) {
+            buffers.add(bufferPool.requestMemorySegmentBlocking());
+        }
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // request 1 overdraft buffers.
+        buffers.add(bufferPool.requestMemorySegmentBlocking());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // set pool size to 4.
+        bufferPool.setNumBuffers(4);
+        assertThat(bufferPool.getNumBuffers()).isEqualTo(4);
+        assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isZero();
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isFalse();
+        buffers.add(bufferPool.requestMemorySegmentBlocking());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isEqualTo(2);
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // return all overdraft buffers.
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isFalse();
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isZero();
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // return the excess buffer.
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.isAvailable()).isFalse();
+        // return non-excess buffers.
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isTrue();
+
+        while (!buffers.isEmpty()) {
+            bufferPool.recycle(buffers.poll());
+        }
+        bufferPool.lazyDestroy();
+    }
+
+    @Test
+    void testIncreasePoolSize() throws Exception {
+        LocalBufferPool bufferPool =
+                new LocalBufferPool(networkBufferPool, 5, 100, 0, Integer.MAX_VALUE, 2);
+        List<MemorySegment> buffers = new ArrayList<>();
+
+        // set pool size to 5.
+        bufferPool.setNumBuffers(5);

Review Comment:
   These magic number can be changed to the filed, such as, `oldPoolSize`, `newPoolSize`, `maxOverdraftSize`.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java:
##########
@@ -242,23 +243,129 @@ public void testRecycleAfterDestroy() {
         localBufferPool.lazyDestroy();
 
         // All buffers have been requested, but can not be returned yet.
-        assertEquals(numBuffers, getNumRequestedFromMemorySegmentPool());
+        assertThat(getNumRequestedFromMemorySegmentPool()).isEqualTo(numBuffers);
 
         // Recycle should return buffers to memory segment pool
         for (Buffer buffer : requests) {
             buffer.recycleBuffer();
         }
     }
 
+    @Test
+    void testDecreasePoolSize() throws Exception {
+        LocalBufferPool bufferPool =
+                new LocalBufferPool(networkBufferPool, 4, 10, 0, Integer.MAX_VALUE, 2);
+        Queue<MemorySegment> buffers = new LinkedList<>();
+
+        // set pool size to 5.
+        bufferPool.setNumBuffers(5);
+        assertThat(bufferPool.getNumBuffers()).isEqualTo(5);
+
+        // request all buffer.
+        for (int i = 0; i < 5; i++) {
+            buffers.add(bufferPool.requestMemorySegmentBlocking());
+        }
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // request 1 overdraft buffers.
+        buffers.add(bufferPool.requestMemorySegmentBlocking());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // set pool size to 4.
+        bufferPool.setNumBuffers(4);
+        assertThat(bufferPool.getNumBuffers()).isEqualTo(4);
+        assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isZero();
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isFalse();
+        buffers.add(bufferPool.requestMemorySegmentBlocking());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isEqualTo(2);
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // return all overdraft buffers.
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isFalse();
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isZero();
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // return the excess buffer.
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.isAvailable()).isFalse();
+        // return non-excess buffers.
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isTrue();
+
+        while (!buffers.isEmpty()) {
+            bufferPool.recycle(buffers.poll());
+        }
+        bufferPool.lazyDestroy();
+    }
+
+    @Test
+    void testIncreasePoolSize() throws Exception {
+        LocalBufferPool bufferPool =
+                new LocalBufferPool(networkBufferPool, 5, 100, 0, Integer.MAX_VALUE, 2);
+        List<MemorySegment> buffers = new ArrayList<>();
+
+        // set pool size to 5.
+        bufferPool.setNumBuffers(5);
+        assertThat(bufferPool.getNumBuffers()).isEqualTo(5);
+
+        // request all buffer.
+        for (int i = 0; i < 5; i++) {
+            buffers.add(bufferPool.requestMemorySegmentBlocking());
+        }
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // request 2 overdraft buffers.
+        buffers.add(bufferPool.requestMemorySegmentBlocking());
+        buffers.add(bufferPool.requestMemorySegmentBlocking());
+        assertThat(bufferPool.requestMemorySegment()).isNull();
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isEqualTo(2);
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // set pool size to 10.
+        bufferPool.setNumBuffers(10);
+        assertThat(bufferPool.getNumBuffers()).isEqualTo(10);
+        assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isOne();
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isEqualTo(2);
+        // available status will not be influenced by overdraft.
+        assertThat(bufferPool.isAvailable()).isTrue();
+        buffers.add(bufferPool.requestMemorySegmentBlocking());
+        assertThat(bufferPool.isAvailable()).isTrue();

Review Comment:
   There may be bug here.
   
   I change the 10 to 7, it should be unavailable, however, it's available. 
   
   `bufferPool.setNumBuffers(7);` requested new buffer from global buffer pool to the `availableMemorySegments`.
   
   Call stack is as follow :
   
   ```
   	  at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentFromGlobal(LocalBufferPool.java:442)
   	  at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.checkAvailability(LocalBufferPool.java:551)
   	  at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.checkAndUpdateAvailability(LocalBufferPool.java:524)
   	  at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:682)
   	  - locked <0xa5d> (a java.util.ArrayDeque)
   	  at org.apache.flink.runtime.io.network.buffer.LocalBufferPoolTest.testIncreasePoolSize(LocalBufferPoolTest.java:331)
   ```
   
   I think `isRequestedSizeReached` should be updated.
   
   
   ```
       private boolean isRequestedSizeReached() {
           return numberOfRequestedMemorySegments + numberOfRequestedOverdraftMemorySegments
                   >= currentPoolSize;
       }
   ```
   
   WDYT?
   
   BYW, we should add more tests here, such as : newPoolSize=7.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java:
##########
@@ -242,23 +243,129 @@ public void testRecycleAfterDestroy() {
         localBufferPool.lazyDestroy();
 
         // All buffers have been requested, but can not be returned yet.
-        assertEquals(numBuffers, getNumRequestedFromMemorySegmentPool());
+        assertThat(getNumRequestedFromMemorySegmentPool()).isEqualTo(numBuffers);
 
         // Recycle should return buffers to memory segment pool
         for (Buffer buffer : requests) {
             buffer.recycleBuffer();
         }
     }
 
+    @Test
+    void testDecreasePoolSize() throws Exception {
+        LocalBufferPool bufferPool =
+                new LocalBufferPool(networkBufferPool, 4, 10, 0, Integer.MAX_VALUE, 2);
+        Queue<MemorySegment> buffers = new LinkedList<>();
+
+        // set pool size to 5.
+        bufferPool.setNumBuffers(5);
+        assertThat(bufferPool.getNumBuffers()).isEqualTo(5);
+
+        // request all buffer.
+        for (int i = 0; i < 5; i++) {
+            buffers.add(bufferPool.requestMemorySegmentBlocking());
+        }
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // request 1 overdraft buffers.
+        buffers.add(bufferPool.requestMemorySegmentBlocking());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // set pool size to 4.
+        bufferPool.setNumBuffers(4);
+        assertThat(bufferPool.getNumBuffers()).isEqualTo(4);
+        assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isZero();
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isFalse();
+        buffers.add(bufferPool.requestMemorySegmentBlocking());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isEqualTo(2);
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // return all overdraft buffers.
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isFalse();
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isZero();
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // return the excess buffer.
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.isAvailable()).isFalse();
+        // return non-excess buffers.
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isTrue();
+
+        while (!buffers.isEmpty()) {
+            bufferPool.recycle(buffers.poll());
+        }
+        bufferPool.lazyDestroy();
+    }
+
+    @Test
+    void testIncreasePoolSize() throws Exception {
+        LocalBufferPool bufferPool =
+                new LocalBufferPool(networkBufferPool, 5, 100, 0, Integer.MAX_VALUE, 2);
+        List<MemorySegment> buffers = new ArrayList<>();
+
+        // set pool size to 5.
+        bufferPool.setNumBuffers(5);

Review Comment:
   testDecreasePoolSize is similar as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org