You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/09/13 09:02:49 UTC
[flink] 01/02: [hotfix] Avoid possible deadlock in SubpartitionView and FileDataManager.
This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git
commit efc9eb763284872615bd9f8ec2199efbdcec1d5e
Author: Weijie Guo <re...@163.com>
AuthorDate: Fri Sep 9 15:42:51 2022 +0800
[hotfix] Avoid possible deadlock in SubpartitionView and FileDataManager.
---
.../partition/hybrid/HsSubpartitionView.java | 11 ++--
.../partition/hybrid/HsFileDataManagerTest.java | 70 +++++++++++++++++++++-
2 files changed, 75 insertions(+), 6 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java
index 704d52703a3..bc9ec5a1744 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java
@@ -73,8 +73,8 @@ public class HsSubpartitionView
@Nullable
@Override
public BufferAndBacklog getNextBuffer() {
- synchronized (lock) {
- try {
+ try {
+ synchronized (lock) {
checkNotNull(diskDataView, "disk data view must be not null.");
checkNotNull(memoryDataView, "memory data view must be not null.");
@@ -84,10 +84,11 @@ public class HsSubpartitionView
}
updateConsumingStatus(bufferToConsume);
return bufferToConsume.map(this::handleBacklog).orElse(null);
- } catch (Throwable cause) {
- releaseInternal(cause);
- return null;
}
+ } catch (Throwable cause) {
+ // release subpartition reader outside of lock to avoid deadlock.
+ releaseInternal(cause);
+ return null;
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java
index cdceb338042..4c168c13fd6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java
@@ -24,6 +24,8 @@ import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorSer
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
+import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.util.TestLoggerExtension;
import org.apache.flink.util.function.BiConsumerWithException;
@@ -340,6 +342,66 @@ class HsFileDataManagerTest {
.hasMessageContaining("HsFileDataManager is already released.");
}
+ /**
+ * When the result partition fails, the view lock may be obtained when the FileDataManager lock
+ * is held. In the same time, the downstream thread will acquire the lock of the FileDataManager
+ * when acquiring the view lock. To avoid this deadlock, the logical of subpartition view
+ * release subpartition reader and subpartition reader fail should not be inside lock.
+ */
+ @Test
+ void testConsumeWhileReleaseNoDeadlock() throws Exception {
+ CompletableFuture<Void> consumerStart = new CompletableFuture<>();
+ CompletableFuture<Void> readerFail = new CompletableFuture<>();
+ HsSubpartitionView subpartitionView =
+ new HsSubpartitionView(new NoOpBufferAvailablityListener());
+
+ HsSubpartitionFileReaderImpl subpartitionFileReader =
+ new HsSubpartitionFileReaderImpl(
+ 0,
+ dataFileChannel,
+ subpartitionView,
+ new HsFileDataIndexImpl(NUM_SUBPARTITIONS),
+ 5,
+ fileDataManager::releaseSubpartitionReader,
+ BufferReaderWriterUtil.allocatedHeaderBuffer()) {
+ @Override
+ public synchronized void fail(Throwable failureCause) {
+ try {
+ readerFail.complete(null);
+ consumerStart.get();
+ super.fail(failureCause);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ factory.allReaders.add(subpartitionFileReader);
+ HsDataView diskDataView = fileDataManager.registerNewSubpartition(0, subpartitionView);
+ subpartitionView.setDiskDataView(diskDataView);
+ TestingHsDataView memoryDataView =
+ TestingHsDataView.builder()
+ .setConsumeBufferFunction(
+ (ignore) -> {
+ // throw an exception to trigger the release of file reader.
+ throw new RuntimeException("expected exception.");
+ })
+ .build();
+ subpartitionView.setMemoryDataView(memoryDataView);
+
+ CheckedThread consumerThread =
+ new CheckedThread() {
+ @Override
+ public void go() throws Exception {
+ readerFail.get();
+ consumerStart.complete(null);
+ subpartitionView.getNextBuffer();
+ }
+ };
+ consumerThread.start();
+ fileDataManager.release();
+ consumerThread.sync();
+ }
+
private static FileChannel openFileChannel(Path path) throws IOException {
return FileChannel.open(path, StandardOpenOption.READ);
}
@@ -352,6 +414,8 @@ class HsFileDataManagerTest {
private Consumer<Throwable> failConsumer = (ignore) -> {};
+ private Runnable releaseDataViewRunnable = () -> {};
+
private final Queue<MemorySegment> readBuffers;
private int priority;
@@ -400,6 +464,10 @@ class HsFileDataManagerTest {
this.failConsumer = failConsumer;
}
+ public void setReleaseDataViewRunnable(Runnable releaseDataViewRunnable) {
+ this.releaseDataViewRunnable = releaseDataViewRunnable;
+ }
+
@Override
public Optional<ResultSubpartition.BufferAndBacklog> consumeBuffer(
int nextBufferToConsume) {
@@ -418,7 +486,7 @@ class HsFileDataManagerTest {
@Override
public void releaseDataView() {
- // do nothing.
+ releaseDataViewRunnable.run();
}
/** Factory for {@link TestingHsSubpartitionFileReader}. */