You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/08/18 15:09:31 UTC
[flink] 01/02: [FLINK-28800][network] BatchShuffleReadIOExecutor using ScheduledExecutorService instead of ExecutorService.
This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit b2c9e00d8a5038e2a50aae57cd38c2dbb8b21db1
Author: Weijie Guo <re...@163.com>
AuthorDate: Thu Aug 11 19:53:08 2022 +0800
[FLINK-28800][network] BatchShuffleReadIOExecutor using ScheduledExecutorService instead of ExecutorService.
---
.../apache/flink/runtime/io/network/NettyShuffleEnvironment.java | 8 ++++----
.../flink/runtime/io/network/NettyShuffleServiceFactory.java | 6 +++---
.../runtime/io/network/partition/ResultPartitionFactory.java | 6 +++---
.../runtime/io/network/partition/hybrid/HsFileDataManager.java | 6 +++---
.../runtime/io/network/partition/hybrid/HsResultPartition.java | 4 ++--
.../runtime/io/network/partition/ResultPartitionBuilder.java | 9 +++++----
.../runtime/io/network/partition/ResultPartitionFactoryTest.java | 4 ++--
.../io/network/partition/hybrid/HsFileDataManagerTest.java | 6 +++---
.../io/network/partition/hybrid/HsResultPartitionTest.java | 6 +++---
9 files changed, 28 insertions(+), 27 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
index 2d55cf4c639..ebfc929b148 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
@@ -59,7 +59,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
import static org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.METRIC_GROUP_INPUT;
import static org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.METRIC_GROUP_OUTPUT;
@@ -104,7 +104,7 @@ public class NettyShuffleEnvironment
private final BatchShuffleReadBufferPool batchShuffleReadBufferPool;
- private final ExecutorService batchShuffleReadIOExecutor;
+ private final ScheduledExecutorService batchShuffleReadIOExecutor;
private boolean isClosed;
@@ -119,7 +119,7 @@ public class NettyShuffleEnvironment
SingleInputGateFactory singleInputGateFactory,
Executor ioExecutor,
BatchShuffleReadBufferPool batchShuffleReadBufferPool,
- ExecutorService batchShuffleReadIOExecutor) {
+ ScheduledExecutorService batchShuffleReadIOExecutor) {
this.taskExecutorResourceId = taskExecutorResourceId;
this.config = config;
this.networkBufferPool = networkBufferPool;
@@ -160,7 +160,7 @@ public class NettyShuffleEnvironment
}
@VisibleForTesting
- public ExecutorService getBatchShuffleReadIOExecutor() {
+ public ScheduledExecutorService getBatchShuffleReadIOExecutor() {
return batchShuffleReadIOExecutor;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
index 98a202c763a..bd6847f29e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
@@ -46,8 +46,8 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.Arrays;
import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import static org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.registerShuffleMetrics;
@@ -182,8 +182,8 @@ public class NettyShuffleServiceFactory
// we create a separated IO executor pool here for batch shuffle instead of reusing the
// TaskManager IO executor pool directly to avoid the potential side effects of execution
// contention, for example, too long IO or waiting time leading to starvation or timeout
- ExecutorService batchShuffleReadIOExecutor =
- Executors.newFixedThreadPool(
+ ScheduledExecutorService batchShuffleReadIOExecutor =
+ Executors.newScheduledThreadPool(
Math.max(
1,
Math.min(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
index ec5c69ad477..a95f5e76770 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
@@ -40,7 +40,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
/** Factory for {@link ResultPartition} to use in {@link NettyShuffleEnvironment}. */
public class ResultPartitionFactory {
@@ -55,7 +55,7 @@ public class ResultPartitionFactory {
private final BatchShuffleReadBufferPool batchShuffleReadBufferPool;
- private final ExecutorService batchShuffleReadIOExecutor;
+ private final ScheduledExecutorService batchShuffleReadIOExecutor;
private final BoundedBlockingSubpartitionType blockingSubpartitionType;
@@ -84,7 +84,7 @@ public class ResultPartitionFactory {
FileChannelManager channelManager,
BufferPoolFactory bufferPoolFactory,
BatchShuffleReadBufferPool batchShuffleReadBufferPool,
- ExecutorService batchShuffleReadIOExecutor,
+ ScheduledExecutorService batchShuffleReadIOExecutor,
BoundedBlockingSubpartitionType blockingSubpartitionType,
int configuredNetworkBuffersPerChannel,
int floatingNetworkBuffersPerGate,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
index 7e8debb8153..eaef4110edc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
@@ -48,7 +48,7 @@ import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -63,7 +63,7 @@ public class HsFileDataManager implements Runnable, BufferRecycler {
private static final Logger LOG = LoggerFactory.getLogger(HsFileDataManager.class);
/** Executor to run the shuffle data reading task. */
- private final Executor ioExecutor;
+ private final ScheduledExecutorService ioExecutor;
/** Maximum number of buffers can be allocated by this partition reader. */
private final int maxRequestedBuffers;
@@ -121,7 +121,7 @@ public class HsFileDataManager implements Runnable, BufferRecycler {
public HsFileDataManager(
BatchShuffleReadBufferPool bufferPool,
- Executor ioExecutor,
+ ScheduledExecutorService ioExecutor,
HsFileDataIndex dataIndex,
Path dataFilePath,
HsSubpartitionFileReader.Factory fileReaderFactory,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
index 15242440ad1..308742909ea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
@@ -44,7 +44,7 @@ import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Path;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
@@ -79,7 +79,7 @@ public class HsResultPartition extends ResultPartition {
int numSubpartitions,
int numTargetKeyGroups,
BatchShuffleReadBufferPool readBufferPool,
- Executor readIOExecutor,
+ ScheduledExecutorService readIOExecutor,
ResultPartitionManager partitionManager,
String dataFileBashPath,
int networkBufferSize,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
index 2d2edbe36e9..5ef48002b03 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
@@ -24,12 +24,12 @@ import org.apache.flink.runtime.io.disk.NoOpFileChannelManager;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.function.SupplierWithException;
import java.io.IOException;
import java.util.Optional;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
/** Utility class to encapsulate the logic of building a {@link ResultPartition} instance. */
public class ResultPartitionBuilder {
@@ -56,7 +56,8 @@ public class ResultPartitionBuilder {
private BatchShuffleReadBufferPool batchShuffleReadBufferPool =
new BatchShuffleReadBufferPool(64 * 32 * 1024, 32 * 1024);
- private ExecutorService batchShuffleReadIOExecutor = Executors.newDirectExecutorService();
+ private ScheduledExecutorService batchShuffleReadIOExecutor =
+ Executors.newSingleThreadScheduledExecutor();
private int networkBuffersPerChannel = 1;
@@ -145,7 +146,7 @@ public class ResultPartitionBuilder {
}
public ResultPartitionBuilder setBatchShuffleReadIOExecutor(
- ExecutorService batchShuffleReadIOExecutor) {
+ ScheduledExecutorService batchShuffleReadIOExecutor) {
this.batchShuffleReadIOExecutor = batchShuffleReadIOExecutor;
return this;
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
index 6a22484f0b0..a7d1b04dacb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
@@ -27,13 +27,13 @@ import org.apache.flink.runtime.shuffle.PartitionDescriptorBuilder;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
import org.apache.flink.util.TestLogger;
-import org.apache.flink.util.concurrent.Executors;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.Arrays;
+import java.util.concurrent.Executors;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -149,7 +149,7 @@ public class ResultPartitionFactoryTest extends TestLogger {
fileChannelManager,
new NetworkBufferPool(1, SEGMENT_SIZE),
new BatchShuffleReadBufferPool(10 * SEGMENT_SIZE, SEGMENT_SIZE),
- Executors.newDirectExecutorService(),
+ Executors.newSingleThreadScheduledExecutor(),
BoundedBlockingSubpartitionType.AUTO,
1,
1,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java
index 4947f5e7776..cdceb338042 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java
@@ -20,12 +20,12 @@ package org.apache.flink.runtime.io.network.partition.hybrid;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.util.TestLoggerExtension;
-import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.util.function.BiConsumerWithException;
import org.junit.jupiter.api.AfterEach;
@@ -66,7 +66,7 @@ class HsFileDataManagerTest {
private final byte[] dataBytes = new byte[BUFFER_SIZE];
- private ManuallyTriggeredScheduledExecutor ioExecutor;
+ private ManuallyTriggeredScheduledExecutorService ioExecutor;
private BatchShuffleReadBufferPool bufferPool;
@@ -85,7 +85,7 @@ class HsFileDataManagerTest {
Random random = new Random();
random.nextBytes(dataBytes);
bufferPool = new BatchShuffleReadBufferPool(BUFFER_POOL_SIZE * BUFFER_SIZE, BUFFER_SIZE);
- ioExecutor = new ManuallyTriggeredScheduledExecutor();
+ ioExecutor = new ManuallyTriggeredScheduledExecutorService();
dataFilePath = Files.createFile(tempDir.resolve(".data"));
dataFileChannel = openFileChannel(dataFilePath);
factory = new TestingHsSubpartitionFileReader.Factory();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java
index 73826440553..b07bcc7e725 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java
@@ -59,8 +59,8 @@ import java.util.Arrays;
import java.util.List;
import java.util.Queue;
import java.util.Random;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
@@ -85,7 +85,7 @@ class HsResultPartitionTest {
private BatchShuffleReadBufferPool readBufferPool;
- private ExecutorService readIOExecutor;
+ private ScheduledExecutorService readIOExecutor;
private TaskIOMetricGroup taskIOMetricGroup;
@@ -97,7 +97,7 @@ class HsResultPartitionTest {
new FileChannelManagerImpl(new String[] {tempDataPath.toString()}, "testing");
globalPool = new NetworkBufferPool(totalBuffers, bufferSize);
readBufferPool = new BatchShuffleReadBufferPool(totalBytes, bufferSize);
- readIOExecutor = Executors.newFixedThreadPool(numThreads);
+ readIOExecutor = Executors.newScheduledThreadPool(numThreads);
}
@AfterEach