You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2022/01/13 14:11:19 UTC

[flink] 02/02: [FLINK-25407][network] Fix the issues caused by FLINK-24035

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1ea2a7a5c90288dff08f702bac71de4d91c00f6f
Author: kevin.cyj <ke...@alibaba-inc.com>
AuthorDate: Thu Dec 30 21:56:15 2021 +0800

    [FLINK-25407][network] Fix the issues caused by FLINK-24035
    
    This PR tries to fix the issues caused by FLINK-24035. More specifically, there are two issues, the first one is the deadlock caused by acquiring the 'factoryLock' in NetworkBufferPool and the other is the incorrect decreasing of the required segments of NetworkBufferPool. Both issues occur during exception handling of requesting segments. Actually, when reserving memory segments for LocalBufferPool, there is no need to modify the value of required segments. As a result, there is no n [...]
    
    This closes #18173.
---
 .../io/network/buffer/NetworkBufferPool.java       | 17 ++++---
 .../io/network/buffer/LocalBufferPoolTest.java     | 59 ++++++++++++++++++++++
 2 files changed, 70 insertions(+), 6 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index 509db03..d9717a4 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -208,7 +208,13 @@ public class NetworkBufferPool
             tryRedistributeBuffers(numberOfSegmentsToRequest);
         }
 
-        return internalRequestMemorySegments(numberOfSegmentsToRequest);
+        try {
+            return internalRequestMemorySegments(numberOfSegmentsToRequest);
+        } catch (IOException exception) {
+            revertRequiredBuffers(numberOfSegmentsToRequest);
+            ExceptionUtils.rethrowIOException(exception);
+            return null;
+        }
     }
 
     private List<MemorySegment> internalRequestMemorySegments(int numberOfSegmentsToRequest)
@@ -248,7 +254,7 @@ public class NetworkBufferPool
                 }
             }
         } catch (Throwable e) {
-            recycleMemorySegments(segments, numberOfSegmentsToRequest);
+            internalRecycleMemorySegments(segments);
             ExceptionUtils.rethrowIOException(e);
         }
 
@@ -272,12 +278,11 @@ public class NetworkBufferPool
      */
     @Override
     public void recycleUnpooledMemorySegments(Collection<MemorySegment> segments) {
-        recycleMemorySegments(segments, segments.size());
-    }
-
-    private void recycleMemorySegments(Collection<MemorySegment> segments, int size) {
         internalRecycleMemorySegments(segments);
+        revertRequiredBuffers(segments.size());
+    }
 
+    private void revertRequiredBuffers(int size) {
         synchronized (factoryLock) {
             numTotalRequiredBuffers -= size;
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
index f77275a..27ad673 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
@@ -127,6 +127,65 @@ public class LocalBufferPoolTest extends TestLogger {
         }
     }
 
+    @Test(timeout = 10000) // timeout can indicate a potential deadlock
+    public void testReserveSegmentsAndCancel() throws Exception {
+        int totalSegments = 4;
+        int segmentsToReserve = 2;
+
+        NetworkBufferPool globalPool = new NetworkBufferPool(totalSegments, memorySegmentSize);
+        BufferPool localPool1 = globalPool.createBufferPool(segmentsToReserve, totalSegments);
+        List<MemorySegment> segments = new ArrayList<>();
+
+        try {
+            for (int i = 0; i < totalSegments; ++i) {
+                segments.add(localPool1.requestMemorySegmentBlocking());
+            }
+
+            BufferPool localPool2 = globalPool.createBufferPool(segmentsToReserve, totalSegments);
+            // the segment reserve thread will be blocked for no buffer is available
+            Thread reserveThread =
+                    new Thread(
+                            () -> {
+                                try {
+                                    localPool2.reserveSegments(segmentsToReserve);
+                                } catch (Throwable ignored) {
+                                }
+                            });
+            reserveThread.start();
+            Thread.sleep(100); // wait to be blocked
+
+            // the cancel thread can be blocked when redistributing buffers
+            Thread cancelThread =
+                    new Thread(
+                            () -> {
+                                localPool1.lazyDestroy();
+                                localPool2.lazyDestroy();
+                            });
+            cancelThread.start();
+
+            // it is expected that the segment reserve thread can be cancelled successfully
+            Thread interruptThread =
+                    new Thread(
+                            () -> {
+                                try {
+                                    do {
+                                        reserveThread.interrupt();
+                                        Thread.sleep(100);
+                                    } while (reserveThread.isAlive() || cancelThread.isAlive());
+                                } catch (Throwable ignored) {
+                                }
+                            });
+            interruptThread.start();
+
+            interruptThread.join();
+        } finally {
+            segments.forEach(localPool1::recycle);
+            localPool1.lazyDestroy();
+            assertEquals(0, globalPool.getNumberOfUsedMemorySegments());
+            globalPool.destroy();
+        }
+    }
+
     @Test
     public void testRequestMoreThanAvailable() {
         localBufferPool.setNumBuffers(numBuffers);