You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2022/02/28 08:03:30 UTC

[flink] 01/03: [FLINK-25819][runtime] Reordered requesting and recycling buffers in order to avoid race condition in testIsAvailableOrNotAfterRequestAndRecycleMultiSegments

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7a6c3c1b978a75898ff3f09a715e25e7e9e91690
Author: Anton Kalashnikov <ka...@yandex.ru>
AuthorDate: Mon Feb 21 16:20:48 2022 +0100

    [FLINK-25819][runtime] Reordered requesting and recycling buffers in order to avoid race condition in testIsAvailableOrNotAfterRequestAndRecycleMultiSegments
---
 .../io/network/buffer/NetworkBufferPoolTest.java   | 26 +++++-----------------
 1 file changed, 5 insertions(+), 21 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
index 73877c9..cc4d49f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
@@ -532,9 +532,8 @@ public class NetworkBufferPoolTest extends TestLogger {
      * NetworkBufferPool#requestUnpooledMemorySegments(int)} and recycled by {@link
      * NetworkBufferPool#recycleUnpooledMemorySegments(Collection)}.
      */
-    @Test(timeout = 10000L)
-    public void testIsAvailableOrNotAfterRequestAndRecycleMultiSegments()
-            throws InterruptedException, IOException {
+    @Test
+    public void testIsAvailableOrNotAfterRequestAndRecycleMultiSegments() throws Exception {
         final int numberOfSegmentsToRequest = 5;
         final int numBuffers = 2 * numberOfSegmentsToRequest;
 
@@ -556,29 +555,14 @@ public class NetworkBufferPoolTest extends TestLogger {
             assertFalse(globalPool.getAvailableFuture().isDone());
             assertEquals(numberOfSegmentsToRequest, segments2.size());
 
-            // request another 5 segments
-            final CountDownLatch latch = new CountDownLatch(1);
-            final List<MemorySegment> segments3 = new ArrayList<>(numberOfSegmentsToRequest);
-            CheckedThread asyncRequest =
-                    new CheckedThread() {
-                        @Override
-                        public void go() throws Exception {
-                            // this request should be blocked until at least 5 segments are recycled
-                            segments3.addAll(
-                                    globalPool.requestUnpooledMemorySegments(
-                                            numberOfSegmentsToRequest));
-                            latch.countDown();
-                        }
-                    };
-            asyncRequest.start();
-
             // recycle 5 segments
             CompletableFuture<?> availableFuture = globalPool.getAvailableFuture();
             globalPool.recycleUnpooledMemorySegments(segments1);
             assertTrue(availableFuture.isDone());
 
-            // wait util the third request is fulfilled
-            latch.await();
+            // request another 5 segments
+            final List<MemorySegment> segments3 =
+                    globalPool.requestUnpooledMemorySegments(numberOfSegmentsToRequest);
             assertFalse(globalPool.getAvailableFuture().isDone());
             assertEquals(numberOfSegmentsToRequest, segments3.size());