You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2022/02/28 08:03:31 UTC
[flink] 02/03: [FLINK-25819][runtime] Added new test for 'Insufficient number of network buffers' scenario into NetworkBufferPoolTest
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 70c8835070e5dd6a44edb33915f0ef8d611c58fb
Author: Anton Kalashnikov <ka...@yandex.ru>
AuthorDate: Mon Feb 21 16:21:34 2022 +0100
[FLINK-25819][runtime] Added new test for 'Insufficient number of network buffers' scenario into NetworkBufferPoolTest
---
.../io/network/buffer/NetworkBufferPoolTest.java | 42 ++++++++++++++++++++++
1 file changed, 42 insertions(+)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
index cc4d49f..13faa93 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
@@ -56,6 +56,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -288,6 +289,47 @@ public class NetworkBufferPoolTest extends TestLogger {
}
/**
+ * Tests {@link NetworkBufferPool#requestUnpooledMemorySegments(int)} with the total number of
+ * allocated buffers for several requests exceeding the capacity of {@link NetworkBufferPool}.
+ */
+ @Test
+ public void testInsufficientNumberOfBuffers() throws Exception {
+ final int numberOfSegmentsToRequest = 5;
+
+ final NetworkBufferPool globalPool = new NetworkBufferPool(numberOfSegmentsToRequest, 128);
+
+ try {
+ // the global pool should be in available state initially
+ assertTrue(globalPool.getAvailableFuture().isDone());
+
+ // request 5 segments
+ List<MemorySegment> segments1 =
+ globalPool.requestUnpooledMemorySegments(numberOfSegmentsToRequest);
+ assertFalse(globalPool.getAvailableFuture().isDone());
+ assertEquals(numberOfSegmentsToRequest, segments1.size());
+
+ // request only 1 segment
+ IOException ioException =
+ assertThrows(
+ IOException.class, () -> globalPool.requestUnpooledMemorySegments(1));
+
+ assertTrue(ioException.getMessage().contains("Insufficient number of network buffers"));
+
+ // recycle 5 segments
+ CompletableFuture<?> availableFuture = globalPool.getAvailableFuture();
+ globalPool.recycleUnpooledMemorySegments(segments1);
+ assertTrue(availableFuture.isDone());
+
+ List<MemorySegment> segments2 =
+ globalPool.requestUnpooledMemorySegments(numberOfSegmentsToRequest);
+ assertFalse(globalPool.getAvailableFuture().isDone());
+ assertEquals(numberOfSegmentsToRequest, segments2.size());
+ } finally {
+ globalPool.destroy();
+ }
+ }
+
+ /**
* Tests {@link NetworkBufferPool#requestUnpooledMemorySegments(int)} with the invalid argument
* to cause exception.
*/