You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2022/02/28 08:03:31 UTC

[flink] 02/03: [FLINK-25819][runtime] Added new test for 'Insufficient number of network buffers' scenario into NetworkBufferPoolTest

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 70c8835070e5dd6a44edb33915f0ef8d611c58fb
Author: Anton Kalashnikov <ka...@yandex.ru>
AuthorDate: Mon Feb 21 16:21:34 2022 +0100

    [FLINK-25819][runtime] Added new test for 'Insufficient number of network buffers' scenario into NetworkBufferPoolTest
---
 .../io/network/buffer/NetworkBufferPoolTest.java   | 42 ++++++++++++++++++++++
 1 file changed, 42 insertions(+)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
index cc4d49f..13faa93 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
@@ -56,6 +56,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -288,6 +289,47 @@ public class NetworkBufferPoolTest extends TestLogger {
     }
 
     /**
+     * Tests {@link NetworkBufferPool#requestUnpooledMemorySegments(int)} with the total number of
+     * allocated buffers for several requests exceeding the capacity of {@link NetworkBufferPool}.
+     */
+    @Test
+    public void testInsufficientNumberOfBuffers() throws Exception {
+        final int numberOfSegmentsToRequest = 5;
+
+        final NetworkBufferPool globalPool = new NetworkBufferPool(numberOfSegmentsToRequest, 128);
+
+        try {
+            // the global pool should be in available state initially
+            assertTrue(globalPool.getAvailableFuture().isDone());
+
+            // request 5 segments
+            List<MemorySegment> segments1 =
+                    globalPool.requestUnpooledMemorySegments(numberOfSegmentsToRequest);
+            assertFalse(globalPool.getAvailableFuture().isDone());
+            assertEquals(numberOfSegmentsToRequest, segments1.size());
+
+            // request only 1 segment
+            IOException ioException =
+                    assertThrows(
+                            IOException.class, () -> globalPool.requestUnpooledMemorySegments(1));
+
+            assertTrue(ioException.getMessage().contains("Insufficient number of network buffers"));
+
+            // recycle 5 segments
+            CompletableFuture<?> availableFuture = globalPool.getAvailableFuture();
+            globalPool.recycleUnpooledMemorySegments(segments1);
+            assertTrue(availableFuture.isDone());
+
+            List<MemorySegment> segments2 =
+                    globalPool.requestUnpooledMemorySegments(numberOfSegmentsToRequest);
+            assertFalse(globalPool.getAvailableFuture().isDone());
+            assertEquals(numberOfSegmentsToRequest, segments2.size());
+        } finally {
+            globalPool.destroy();
+        }
+    }
+
+    /**
      * Tests {@link NetworkBufferPool#requestUnpooledMemorySegments(int)} with the invalid argument
      * to cause exception.
      */