You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2022/02/28 08:03:30 UTC
[flink] 01/03: [FLINK-25819][runtime] Reordered requesting and recycling buffers in order to avoid race condition in testIsAvailableOrNotAfterRequestAndRecycleMultiSegments
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7a6c3c1b978a75898ff3f09a715e25e7e9e91690
Author: Anton Kalashnikov <ka...@yandex.ru>
AuthorDate: Mon Feb 21 16:20:48 2022 +0100
[FLINK-25819][runtime] Reordered requesting and recycling buffers in order to avoid race condition in testIsAvailableOrNotAfterRequestAndRecycleMultiSegments
---
.../io/network/buffer/NetworkBufferPoolTest.java | 26 +++++-----------------
1 file changed, 5 insertions(+), 21 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
index 73877c9..cc4d49f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
@@ -532,9 +532,8 @@ public class NetworkBufferPoolTest extends TestLogger {
* NetworkBufferPool#requestUnpooledMemorySegments(int)} and recycled by {@link
* NetworkBufferPool#recycleUnpooledMemorySegments(Collection)}.
*/
- @Test(timeout = 10000L)
- public void testIsAvailableOrNotAfterRequestAndRecycleMultiSegments()
- throws InterruptedException, IOException {
+ @Test
+ public void testIsAvailableOrNotAfterRequestAndRecycleMultiSegments() throws Exception {
final int numberOfSegmentsToRequest = 5;
final int numBuffers = 2 * numberOfSegmentsToRequest;
@@ -556,29 +555,14 @@ public class NetworkBufferPoolTest extends TestLogger {
assertFalse(globalPool.getAvailableFuture().isDone());
assertEquals(numberOfSegmentsToRequest, segments2.size());
- // request another 5 segments
- final CountDownLatch latch = new CountDownLatch(1);
- final List<MemorySegment> segments3 = new ArrayList<>(numberOfSegmentsToRequest);
- CheckedThread asyncRequest =
- new CheckedThread() {
- @Override
- public void go() throws Exception {
- // this request should be blocked until at least 5 segments are recycled
- segments3.addAll(
- globalPool.requestUnpooledMemorySegments(
- numberOfSegmentsToRequest));
- latch.countDown();
- }
- };
- asyncRequest.start();
-
// recycle 5 segments
CompletableFuture<?> availableFuture = globalPool.getAvailableFuture();
globalPool.recycleUnpooledMemorySegments(segments1);
assertTrue(availableFuture.isDone());
- // wait util the third request is fulfilled
- latch.await();
+ // request another 5 segments
+ final List<MemorySegment> segments3 =
+ globalPool.requestUnpooledMemorySegments(numberOfSegmentsToRequest);
assertFalse(globalPool.getAvailableFuture().isDone());
assertEquals(numberOfSegmentsToRequest, segments3.size());