You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/09/13 09:02:49 UTC

[flink] 01/02: [hotfix] Avoid possible deadlock in SubpartitionView and FileDataManager.

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git

commit efc9eb763284872615bd9f8ec2199efbdcec1d5e
Author: Weijie Guo <re...@163.com>
AuthorDate: Fri Sep 9 15:42:51 2022 +0800

    [hotfix] Avoid possible deadlock in SubpartitionView and FileDataManager.
---
 .../partition/hybrid/HsSubpartitionView.java       | 11 ++--
 .../partition/hybrid/HsFileDataManagerTest.java    | 70 +++++++++++++++++++++-
 2 files changed, 75 insertions(+), 6 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java
index 704d52703a3..bc9ec5a1744 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java
@@ -73,8 +73,8 @@ public class HsSubpartitionView
     @Nullable
     @Override
     public BufferAndBacklog getNextBuffer() {
-        synchronized (lock) {
-            try {
+        try {
+            synchronized (lock) {
                 checkNotNull(diskDataView, "disk data view must be not null.");
                 checkNotNull(memoryDataView, "memory data view must be not null.");
 
@@ -84,10 +84,11 @@ public class HsSubpartitionView
                 }
                 updateConsumingStatus(bufferToConsume);
                 return bufferToConsume.map(this::handleBacklog).orElse(null);
-            } catch (Throwable cause) {
-                releaseInternal(cause);
-                return null;
             }
+        } catch (Throwable cause) {
+            // release subpartition reader outside of lock to avoid deadlock.
+            releaseInternal(cause);
+            return null;
         }
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java
index cdceb338042..4c168c13fd6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java
@@ -24,6 +24,8 @@ import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorSer
 import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
+import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
 import org.apache.flink.util.TestLoggerExtension;
 import org.apache.flink.util.function.BiConsumerWithException;
@@ -340,6 +342,66 @@ class HsFileDataManagerTest {
                 .hasMessageContaining("HsFileDataManager is already released.");
     }
 
+    /**
+     * When the result partition fails, the view lock may be obtained when the FileDataManager lock
+     * is held. In the same time, the downstream thread will acquire the lock of the FileDataManager
+     * when acquiring the view lock. To avoid this deadlock, the logical of subpartition view
+     * release subpartition reader and subpartition reader fail should not be inside lock.
+     */
+    @Test
+    void testConsumeWhileReleaseNoDeadlock() throws Exception {
+        CompletableFuture<Void> consumerStart = new CompletableFuture<>();
+        CompletableFuture<Void> readerFail = new CompletableFuture<>();
+        HsSubpartitionView subpartitionView =
+                new HsSubpartitionView(new NoOpBufferAvailablityListener());
+
+        HsSubpartitionFileReaderImpl subpartitionFileReader =
+                new HsSubpartitionFileReaderImpl(
+                        0,
+                        dataFileChannel,
+                        subpartitionView,
+                        new HsFileDataIndexImpl(NUM_SUBPARTITIONS),
+                        5,
+                        fileDataManager::releaseSubpartitionReader,
+                        BufferReaderWriterUtil.allocatedHeaderBuffer()) {
+                    @Override
+                    public synchronized void fail(Throwable failureCause) {
+                        try {
+                            readerFail.complete(null);
+                            consumerStart.get();
+                            super.fail(failureCause);
+                        } catch (Exception e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                };
+        factory.allReaders.add(subpartitionFileReader);
+        HsDataView diskDataView = fileDataManager.registerNewSubpartition(0, subpartitionView);
+        subpartitionView.setDiskDataView(diskDataView);
+        TestingHsDataView memoryDataView =
+                TestingHsDataView.builder()
+                        .setConsumeBufferFunction(
+                                (ignore) -> {
+                                    // throw an exception to trigger the release of file reader.
+                                    throw new RuntimeException("expected exception.");
+                                })
+                        .build();
+        subpartitionView.setMemoryDataView(memoryDataView);
+
+        CheckedThread consumerThread =
+                new CheckedThread() {
+                    @Override
+                    public void go() throws Exception {
+                        readerFail.get();
+                        consumerStart.complete(null);
+                        subpartitionView.getNextBuffer();
+                    }
+                };
+        consumerThread.start();
+        fileDataManager.release();
+        consumerThread.sync();
+    }
+
     private static FileChannel openFileChannel(Path path) throws IOException {
         return FileChannel.open(path, StandardOpenOption.READ);
     }
@@ -352,6 +414,8 @@ class HsFileDataManagerTest {
 
         private Consumer<Throwable> failConsumer = (ignore) -> {};
 
+        private Runnable releaseDataViewRunnable = () -> {};
+
         private final Queue<MemorySegment> readBuffers;
 
         private int priority;
@@ -400,6 +464,10 @@ class HsFileDataManagerTest {
             this.failConsumer = failConsumer;
         }
 
+        public void setReleaseDataViewRunnable(Runnable releaseDataViewRunnable) {
+            this.releaseDataViewRunnable = releaseDataViewRunnable;
+        }
+
         @Override
         public Optional<ResultSubpartition.BufferAndBacklog> consumeBuffer(
                 int nextBufferToConsume) {
@@ -418,7 +486,7 @@ class HsFileDataManagerTest {
 
         @Override
         public void releaseDataView() {
-            // do nothing.
+            releaseDataViewRunnable.run();
         }
 
         /** Factory for {@link TestingHsSubpartitionFileReader}. */