You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by yi...@apache.org on 2022/02/04 11:00:00 UTC
[flink] branch release-1.14 updated: [FLINK-25653][network] Move buffer recycle in SortMergeSubpartitionReader out of lock to avoid deadlock
This is an automated email from the ASF dual-hosted git repository.
yingjie pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new fdbf60f [FLINK-25653][network] Move buffer recycle in SortMergeSubpartitionReader out of lock to avoid deadlock
fdbf60f is described below
commit fdbf60f52e2b2eaf3fdb474b4624d0857f892824
Author: Yuxin Tan <ta...@gmail.com>
AuthorDate: Wed Jan 26 10:42:25 2022 +0800
[FLINK-25653][network] Move buffer recycle in SortMergeSubpartitionReader out of lock to avoid deadlock
For the current sort-shuffle implementation, the different lock orders in SortMergeSubpartitionReader and SortMergeResultPartitionReadScheduler may cause deadlock. To solve the problem, this change moves buffer recycle in SortMergeSubpartitionReader out of the lock.
This closes #18551.
---
.../partition/SortMergeSubpartitionReader.java | 34 ++++++++------
.../SortMergeResultPartitionReadSchedulerTest.java | 52 +++++++++++++++++-----
2 files changed, 63 insertions(+), 23 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java
index cd04d6f..4644a98 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java
@@ -28,6 +28,8 @@ import javax.annotation.concurrent.GuardedBy;
import java.io.IOException;
import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
@@ -97,21 +99,27 @@ class SortMergeSubpartitionReader
}
private void addBuffer(Buffer buffer) {
- boolean notifyAvailable;
+ boolean notifyAvailable = false;
+ boolean needRecycleBuffer = false;
+
synchronized (lock) {
if (isReleased) {
- buffer.recycleBuffer();
- throw new IllegalStateException("Subpartition reader has been already released.");
- }
-
- notifyAvailable = buffersRead.isEmpty();
+ needRecycleBuffer = true;
+ } else {
+ notifyAvailable = buffersRead.isEmpty();
- buffersRead.add(buffer);
- if (buffer.isBuffer()) {
- ++dataBufferBacklog;
+ buffersRead.add(buffer);
+ if (buffer.isBuffer()) {
+ ++dataBufferBacklog;
+ }
}
}
+ if (needRecycleBuffer) {
+ buffer.recycleBuffer();
+ throw new IllegalStateException("Subpartition reader has been already released.");
+ }
+
if (notifyAvailable) {
notifyDataAvailable();
}
@@ -171,6 +179,7 @@ class SortMergeSubpartitionReader
}
private void releaseInternal(@Nullable Throwable throwable) {
+ List<Buffer> buffersToRecycle;
synchronized (lock) {
if (isReleased) {
return;
@@ -180,13 +189,12 @@ class SortMergeSubpartitionReader
if (failureCause == null) {
failureCause = throwable;
}
-
- for (Buffer buffer : buffersRead) {
- buffer.recycleBuffer();
- }
+ buffersToRecycle = new ArrayList<>(buffersRead);
buffersRead.clear();
dataBufferBacklog = 0;
}
+ buffersToRecycle.forEach(Buffer::recycleBuffer);
+ buffersToRecycle.clear();
releaseFuture.complete(null);
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
index 4650630..57015d3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.util.ExceptionUtils;
@@ -37,6 +38,7 @@ import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.time.Duration;
+import java.util.ArrayDeque;
import java.util.LinkedList;
import java.util.List;
import java.util.PriorityQueue;
@@ -69,6 +71,12 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
private PartitionedFile partitionedFile;
+ private PartitionedFileReader fileReader;
+
+ private FileChannel dataFileChannel;
+
+ private FileChannel indexFileChannel;
+
private BatchShuffleReadBufferPool bufferPool;
private ExecutorService executor;
@@ -90,13 +98,19 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
numBuffersPerSubpartition,
bufferSize,
dataBytes);
+ dataFileChannel = openFileChannel(partitionedFile.getDataFilePath());
+ indexFileChannel = openFileChannel(partitionedFile.getIndexFilePath());
+ fileReader =
+ new PartitionedFileReader(partitionedFile, 0, dataFileChannel, indexFileChannel);
bufferPool = new BatchShuffleReadBufferPool(totalBytes, bufferSize);
executor = Executors.newFixedThreadPool(numThreads);
readScheduler = new SortMergeResultPartitionReadScheduler(bufferPool, executor, this);
}
@After
- public void after() {
+ public void after() throws Exception {
+ dataFileChannel.close();
+ indexFileChannel.close();
partitionedFile.deleteQuietly();
bufferPool.destroy();
executor.shutdown();
@@ -203,6 +217,33 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
assertAllResourcesReleased();
}
+ @Test(timeout = 60000)
+ public void testNoDeadlockWhenReadAndReleaseBuffers() throws Exception {
+ SortMergeSubpartitionReader subpartitionReader =
+ new SortMergeSubpartitionReader(new NoOpBufferAvailablityListener(), fileReader);
+ Thread readAndReleaseThread =
+ new Thread(
+ () -> {
+ Queue<MemorySegment> segments = new ArrayDeque<>();
+ segments.add(MemorySegmentFactory.allocateUnpooledSegment(bufferSize));
+ try {
+ assertTrue(fileReader.hasRemaining());
+ subpartitionReader.readBuffers(segments, readScheduler);
+ subpartitionReader.releaseAllResources();
+ subpartitionReader.readBuffers(segments, readScheduler);
+ } catch (Exception ignore) {
+ }
+ });
+
+ synchronized (this) {
+ readAndReleaseThread.start();
+ do {
+ Thread.sleep(100);
+ } while (!subpartitionReader.isReleased());
+ }
+ readAndReleaseThread.join();
+ }
+
@Test
public void testRequestBufferTimeoutAndFailed() throws Exception {
Duration bufferRequestTimeout = Duration.ofSeconds(3);
@@ -238,13 +279,6 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
SortMergeResultPartitionReadScheduler readScheduler =
new SortMergeResultPartitionReadScheduler(
bufferPool, executor, this, bufferRequestTimeout);
-
- FileChannel dataFileChannel = openFileChannel(partitionedFile.getDataFilePath());
- FileChannel indexFileChannel = openFileChannel(partitionedFile.getIndexFilePath());
-
- PartitionedFileReader fileReader =
- new PartitionedFileReader(partitionedFile, 0, dataFileChannel, indexFileChannel);
-
SortMergeSubpartitionReader subpartitionReader =
new SortMergeSubpartitionReader(new NoOpBufferAvailablityListener(), fileReader);
@@ -261,8 +295,6 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
bufferPool.recycle(allocatedBuffers);
bufferPool.destroy();
- dataFileChannel.close();
- indexFileChannel.close();
readScheduler.release();
}