You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by yi...@apache.org on 2022/02/04 11:00:00 UTC

[flink] branch release-1.14 updated: [FLINK-25653][network] Move buffer recycle in SortMergeSubpartitionReader out of lock to avoid deadlock

This is an automated email from the ASF dual-hosted git repository.

yingjie pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new fdbf60f  [FLINK-25653][network] Move buffer recycle in SortMergeSubpartitionReader out of lock to avoid deadlock
fdbf60f is described below

commit fdbf60f52e2b2eaf3fdb474b4624d0857f892824
Author: Yuxin Tan <ta...@gmail.com>
AuthorDate: Wed Jan 26 10:42:25 2022 +0800

    [FLINK-25653][network] Move buffer recycle in SortMergeSubpartitionReader out of lock to avoid deadlock
    
    For the current sort-shuffle implementation, the different lock orders in SortMergeSubpartitionReader and SortMergeResultPartitionReadScheduler may cause deadlock. To solve the problem, this change moves buffer recycle in SortMergeSubpartitionReader out of the lock.
    
    This closes #18551.
---
 .../partition/SortMergeSubpartitionReader.java     | 34 ++++++++------
 .../SortMergeResultPartitionReadSchedulerTest.java | 52 +++++++++++++++++-----
 2 files changed, 63 insertions(+), 23 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java
index cd04d6f..4644a98 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java
@@ -28,6 +28,8 @@ import javax.annotation.concurrent.GuardedBy;
 
 import java.io.IOException;
 import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 
@@ -97,21 +99,27 @@ class SortMergeSubpartitionReader
     }
 
     private void addBuffer(Buffer buffer) {
-        boolean notifyAvailable;
+        boolean notifyAvailable = false;
+        boolean needRecycleBuffer = false;
+
         synchronized (lock) {
             if (isReleased) {
-                buffer.recycleBuffer();
-                throw new IllegalStateException("Subpartition reader has been already released.");
-            }
-
-            notifyAvailable = buffersRead.isEmpty();
+                needRecycleBuffer = true;
+            } else {
+                notifyAvailable = buffersRead.isEmpty();
 
-            buffersRead.add(buffer);
-            if (buffer.isBuffer()) {
-                ++dataBufferBacklog;
+                buffersRead.add(buffer);
+                if (buffer.isBuffer()) {
+                    ++dataBufferBacklog;
+                }
             }
         }
 
+        if (needRecycleBuffer) {
+            buffer.recycleBuffer();
+            throw new IllegalStateException("Subpartition reader has been already released.");
+        }
+
         if (notifyAvailable) {
             notifyDataAvailable();
         }
@@ -171,6 +179,7 @@ class SortMergeSubpartitionReader
     }
 
     private void releaseInternal(@Nullable Throwable throwable) {
+        List<Buffer> buffersToRecycle;
         synchronized (lock) {
             if (isReleased) {
                 return;
@@ -180,13 +189,12 @@ class SortMergeSubpartitionReader
             if (failureCause == null) {
                 failureCause = throwable;
             }
-
-            for (Buffer buffer : buffersRead) {
-                buffer.recycleBuffer();
-            }
+            buffersToRecycle = new ArrayList<>(buffersRead);
             buffersRead.clear();
             dataBufferBacklog = 0;
         }
+        buffersToRecycle.forEach(Buffer::recycleBuffer);
+        buffersToRecycle.clear();
 
         releaseFuture.complete(null);
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
index 4650630..57015d3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.util.ExceptionUtils;
@@ -37,6 +38,7 @@ import java.nio.channels.FileChannel;
 import java.nio.file.Path;
 import java.nio.file.StandardOpenOption;
 import java.time.Duration;
+import java.util.ArrayDeque;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.PriorityQueue;
@@ -69,6 +71,12 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
 
     private PartitionedFile partitionedFile;
 
+    private PartitionedFileReader fileReader;
+
+    private FileChannel dataFileChannel;
+
+    private FileChannel indexFileChannel;
+
     private BatchShuffleReadBufferPool bufferPool;
 
     private ExecutorService executor;
@@ -90,13 +98,19 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
                         numBuffersPerSubpartition,
                         bufferSize,
                         dataBytes);
+        dataFileChannel = openFileChannel(partitionedFile.getDataFilePath());
+        indexFileChannel = openFileChannel(partitionedFile.getIndexFilePath());
+        fileReader =
+                new PartitionedFileReader(partitionedFile, 0, dataFileChannel, indexFileChannel);
         bufferPool = new BatchShuffleReadBufferPool(totalBytes, bufferSize);
         executor = Executors.newFixedThreadPool(numThreads);
         readScheduler = new SortMergeResultPartitionReadScheduler(bufferPool, executor, this);
     }
 
     @After
-    public void after() {
+    public void after() throws Exception {
+        dataFileChannel.close();
+        indexFileChannel.close();
         partitionedFile.deleteQuietly();
         bufferPool.destroy();
         executor.shutdown();
@@ -203,6 +217,33 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
         assertAllResourcesReleased();
     }
 
+    @Test(timeout = 60000)
+    public void testNoDeadlockWhenReadAndReleaseBuffers() throws Exception {
+        SortMergeSubpartitionReader subpartitionReader =
+                new SortMergeSubpartitionReader(new NoOpBufferAvailablityListener(), fileReader);
+        Thread readAndReleaseThread =
+                new Thread(
+                        () -> {
+                            Queue<MemorySegment> segments = new ArrayDeque<>();
+                            segments.add(MemorySegmentFactory.allocateUnpooledSegment(bufferSize));
+                            try {
+                                assertTrue(fileReader.hasRemaining());
+                                subpartitionReader.readBuffers(segments, readScheduler);
+                                subpartitionReader.releaseAllResources();
+                                subpartitionReader.readBuffers(segments, readScheduler);
+                            } catch (Exception ignore) {
+                            }
+                        });
+
+        synchronized (this) {
+            readAndReleaseThread.start();
+            do {
+                Thread.sleep(100);
+            } while (!subpartitionReader.isReleased());
+        }
+        readAndReleaseThread.join();
+    }
+
     @Test
     public void testRequestBufferTimeoutAndFailed() throws Exception {
         Duration bufferRequestTimeout = Duration.ofSeconds(3);
@@ -238,13 +279,6 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
         SortMergeResultPartitionReadScheduler readScheduler =
                 new SortMergeResultPartitionReadScheduler(
                         bufferPool, executor, this, bufferRequestTimeout);
-
-        FileChannel dataFileChannel = openFileChannel(partitionedFile.getDataFilePath());
-        FileChannel indexFileChannel = openFileChannel(partitionedFile.getIndexFilePath());
-
-        PartitionedFileReader fileReader =
-                new PartitionedFileReader(partitionedFile, 0, dataFileChannel, indexFileChannel);
-
         SortMergeSubpartitionReader subpartitionReader =
                 new SortMergeSubpartitionReader(new NoOpBufferAvailablityListener(), fileReader);
 
@@ -261,8 +295,6 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
 
         bufferPool.recycle(allocatedBuffers);
         bufferPool.destroy();
-        dataFileChannel.close();
-        indexFileChannel.close();
         readScheduler.release();
     }