You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2022/02/28 08:03:29 UTC

[flink] branch release-1.14 updated (77a7d40 -> e4c99b1)

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a change to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 77a7d40  [FLINK-26285] Fixes an inconsistency which was introduced by c3a6b51 as part of changes done for FLINK-19543
     new 7a6c3c1  [FLINK-25819][runtime] Reordered requesting and recycling buffers in order to avoid race condition in testIsAvailableOrNotAfterRequestAndRecycleMultiSegments
     new 70c8835  [FLINK-25819][runtime] Added new test for 'Insufficient number of network buffers' scenario into NetworkBufferPoolTest
     new e4c99b1  [hotfix][runtime] CodeStyle correction for NetworkBufferPoolTest

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../io/network/buffer/NetworkBufferPoolTest.java   | 91 +++++++++++++---------
 1 file changed, 53 insertions(+), 38 deletions(-)

[flink] 02/03: [FLINK-25819][runtime] Added new test for 'Insufficient number of network buffers' scenario into NetworkBufferPoolTest

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 70c8835070e5dd6a44edb33915f0ef8d611c58fb
Author: Anton Kalashnikov <ka...@yandex.ru>
AuthorDate: Mon Feb 21 16:21:34 2022 +0100

    [FLINK-25819][runtime] Added new test for 'Insufficient number of network buffers' scenario into NetworkBufferPoolTest
---
 .../io/network/buffer/NetworkBufferPoolTest.java   | 42 ++++++++++++++++++++++
 1 file changed, 42 insertions(+)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
index cc4d49f..13faa93 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
@@ -56,6 +56,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -288,6 +289,47 @@ public class NetworkBufferPoolTest extends TestLogger {
     }
 
     /**
+     * Tests {@link NetworkBufferPool#requestUnpooledMemorySegments(int)} with the total number of
+     * allocated buffers for several requests exceeding the capacity of {@link NetworkBufferPool}.
+     */
+    @Test
+    public void testInsufficientNumberOfBuffers() throws Exception {
+        final int numberOfSegmentsToRequest = 5;
+
+        final NetworkBufferPool globalPool = new NetworkBufferPool(numberOfSegmentsToRequest, 128);
+
+        try {
+            // the global pool should be in available state initially
+            assertTrue(globalPool.getAvailableFuture().isDone());
+
+            // request 5 segments
+            List<MemorySegment> segments1 =
+                    globalPool.requestUnpooledMemorySegments(numberOfSegmentsToRequest);
+            assertFalse(globalPool.getAvailableFuture().isDone());
+            assertEquals(numberOfSegmentsToRequest, segments1.size());
+
+            // request only 1 segment
+            IOException ioException =
+                    assertThrows(
+                            IOException.class, () -> globalPool.requestUnpooledMemorySegments(1));
+
+            assertTrue(ioException.getMessage().contains("Insufficient number of network buffers"));
+
+            // recycle 5 segments
+            CompletableFuture<?> availableFuture = globalPool.getAvailableFuture();
+            globalPool.recycleUnpooledMemorySegments(segments1);
+            assertTrue(availableFuture.isDone());
+
+            List<MemorySegment> segments2 =
+                    globalPool.requestUnpooledMemorySegments(numberOfSegmentsToRequest);
+            assertFalse(globalPool.getAvailableFuture().isDone());
+            assertEquals(numberOfSegmentsToRequest, segments2.size());
+        } finally {
+            globalPool.destroy();
+        }
+    }
+
+    /**
      * Tests {@link NetworkBufferPool#requestUnpooledMemorySegments(int)} with the invalid argument
      * to cause exception.
      */

[flink] 01/03: [FLINK-25819][runtime] Reordered requesting and recycling buffers in order to avoid race condition in testIsAvailableOrNotAfterRequestAndRecycleMultiSegments

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7a6c3c1b978a75898ff3f09a715e25e7e9e91690
Author: Anton Kalashnikov <ka...@yandex.ru>
AuthorDate: Mon Feb 21 16:20:48 2022 +0100

    [FLINK-25819][runtime] Reordered requesting and recycling buffers in order to avoid race condition in testIsAvailableOrNotAfterRequestAndRecycleMultiSegments
---
 .../io/network/buffer/NetworkBufferPoolTest.java   | 26 +++++-----------------
 1 file changed, 5 insertions(+), 21 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
index 73877c9..cc4d49f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
@@ -532,9 +532,8 @@ public class NetworkBufferPoolTest extends TestLogger {
      * NetworkBufferPool#requestUnpooledMemorySegments(int)} and recycled by {@link
      * NetworkBufferPool#recycleUnpooledMemorySegments(Collection)}.
      */
-    @Test(timeout = 10000L)
-    public void testIsAvailableOrNotAfterRequestAndRecycleMultiSegments()
-            throws InterruptedException, IOException {
+    @Test
+    public void testIsAvailableOrNotAfterRequestAndRecycleMultiSegments() throws Exception {
         final int numberOfSegmentsToRequest = 5;
         final int numBuffers = 2 * numberOfSegmentsToRequest;
 
@@ -556,29 +555,14 @@ public class NetworkBufferPoolTest extends TestLogger {
             assertFalse(globalPool.getAvailableFuture().isDone());
             assertEquals(numberOfSegmentsToRequest, segments2.size());
 
-            // request another 5 segments
-            final CountDownLatch latch = new CountDownLatch(1);
-            final List<MemorySegment> segments3 = new ArrayList<>(numberOfSegmentsToRequest);
-            CheckedThread asyncRequest =
-                    new CheckedThread() {
-                        @Override
-                        public void go() throws Exception {
-                            // this request should be blocked until at least 5 segments are recycled
-                            segments3.addAll(
-                                    globalPool.requestUnpooledMemorySegments(
-                                            numberOfSegmentsToRequest));
-                            latch.countDown();
-                        }
-                    };
-            asyncRequest.start();
-
             // recycle 5 segments
             CompletableFuture<?> availableFuture = globalPool.getAvailableFuture();
             globalPool.recycleUnpooledMemorySegments(segments1);
             assertTrue(availableFuture.isDone());
 
-            // wait util the third request is fulfilled
-            latch.await();
+            // request another 5 segments
+            final List<MemorySegment> segments3 =
+                    globalPool.requestUnpooledMemorySegments(numberOfSegmentsToRequest);
             assertFalse(globalPool.getAvailableFuture().isDone());
             assertEquals(numberOfSegmentsToRequest, segments3.size());
 

[flink] 03/03: [hotfix][runtime] CodeStyle correction for NetworkBufferPoolTest

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e4c99b19e9fa876157a98d23b5b5bc498da7fac9
Author: Anton Kalashnikov <ka...@yandex.ru>
AuthorDate: Mon Feb 21 16:37:58 2022 +0100

    [hotfix][runtime] CodeStyle correction for NetworkBufferPoolTest
---
 .../io/network/buffer/NetworkBufferPoolTest.java   | 23 ++++++----------------
 1 file changed, 6 insertions(+), 17 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
index 13faa93..e5f878e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
@@ -23,10 +23,7 @@ import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.rules.Timeout;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -40,13 +37,13 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.hasProperty;
 import static org.hamcrest.core.IsCollectionContaining.hasItem;
 import static org.hamcrest.core.IsNot.not;
@@ -55,7 +52,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -63,10 +59,6 @@ import static org.junit.Assert.fail;
 /** Tests for {@link NetworkBufferPool}. */
 public class NetworkBufferPoolTest extends TestLogger {
 
-    @Rule public ExpectedException expectedException = ExpectedException.none();
-
-    @Rule public Timeout timeout = new Timeout(10, TimeUnit.SECONDS);
-
     @Test
     public void testCreatePoolAfterDestroy() {
         try {
@@ -434,10 +426,9 @@ public class NetworkBufferPoolTest extends TestLogger {
 
         segment.free();
 
-        expectedException.expect(IllegalStateException.class);
-        expectedException.expectMessage("destroyed");
         try {
-            asyncRequest.sync();
+            Exception ex = assertThrows(IllegalStateException.class, asyncRequest::sync);
+            assertTrue(ex.getMessage().contains("destroyed"));
         } finally {
             globalPool.destroy();
         }
@@ -518,11 +509,9 @@ public class NetworkBufferPoolTest extends TestLogger {
 
         asyncRequest.start();
 
-        expectedException.expect(IOException.class);
-        expectedException.expectMessage("Timeout");
-
         try {
-            asyncRequest.sync();
+            Exception ex = assertThrows(IOException.class, asyncRequest::sync);
+            assertTrue(ex.getMessage().contains("Timeout"));
         } finally {
             globalPool.destroy();
         }
@@ -683,7 +672,7 @@ public class NetworkBufferPoolTest extends TestLogger {
             // wait until all available buffers are requested
             while (segmentsRequested.size() + segments.size() + exclusiveSegments.size()
                     < numBuffers) {
-                Thread.sleep(100);
+                Thread.sleep(10);
                 assertNull(cause.get());
             }