You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/08/18 15:09:31 UTC

[flink] 01/02: [FLINK-28800][network] BatchShuffleReadIOExecutor using ScheduledExecutorService instead of ExecutorService.

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b2c9e00d8a5038e2a50aae57cd38c2dbb8b21db1
Author: Weijie Guo <re...@163.com>
AuthorDate: Thu Aug 11 19:53:08 2022 +0800

    [FLINK-28800][network] BatchShuffleReadIOExecutor using ScheduledExecutorService instead of ExecutorService.
---
 .../apache/flink/runtime/io/network/NettyShuffleEnvironment.java | 8 ++++----
 .../flink/runtime/io/network/NettyShuffleServiceFactory.java     | 6 +++---
 .../runtime/io/network/partition/ResultPartitionFactory.java     | 6 +++---
 .../runtime/io/network/partition/hybrid/HsFileDataManager.java   | 6 +++---
 .../runtime/io/network/partition/hybrid/HsResultPartition.java   | 4 ++--
 .../runtime/io/network/partition/ResultPartitionBuilder.java     | 9 +++++----
 .../runtime/io/network/partition/ResultPartitionFactoryTest.java | 4 ++--
 .../io/network/partition/hybrid/HsFileDataManagerTest.java       | 6 +++---
 .../io/network/partition/hybrid/HsResultPartitionTest.java       | 6 +++---
 9 files changed, 28 insertions(+), 27 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
index 2d55cf4c639..ebfc929b148 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
@@ -59,7 +59,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
 
 import static org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.METRIC_GROUP_INPUT;
 import static org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.METRIC_GROUP_OUTPUT;
@@ -104,7 +104,7 @@ public class NettyShuffleEnvironment
 
     private final BatchShuffleReadBufferPool batchShuffleReadBufferPool;
 
-    private final ExecutorService batchShuffleReadIOExecutor;
+    private final ScheduledExecutorService batchShuffleReadIOExecutor;
 
     private boolean isClosed;
 
@@ -119,7 +119,7 @@ public class NettyShuffleEnvironment
             SingleInputGateFactory singleInputGateFactory,
             Executor ioExecutor,
             BatchShuffleReadBufferPool batchShuffleReadBufferPool,
-            ExecutorService batchShuffleReadIOExecutor) {
+            ScheduledExecutorService batchShuffleReadIOExecutor) {
         this.taskExecutorResourceId = taskExecutorResourceId;
         this.config = config;
         this.networkBufferPool = networkBufferPool;
@@ -160,7 +160,7 @@ public class NettyShuffleEnvironment
     }
 
     @VisibleForTesting
-    public ExecutorService getBatchShuffleReadIOExecutor() {
+    public ScheduledExecutorService getBatchShuffleReadIOExecutor() {
         return batchShuffleReadIOExecutor;
     }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
index 98a202c763a..bd6847f29e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
@@ -46,8 +46,8 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.util.Arrays;
 import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.registerShuffleMetrics;
@@ -182,8 +182,8 @@ public class NettyShuffleServiceFactory
         // we create a separated IO executor pool here for batch shuffle instead of reusing the
         // TaskManager IO executor pool directly to avoid the potential side effects of execution
         // contention, for example, too long IO or waiting time leading to starvation or timeout
-        ExecutorService batchShuffleReadIOExecutor =
-                Executors.newFixedThreadPool(
+        ScheduledExecutorService batchShuffleReadIOExecutor =
+                Executors.newScheduledThreadPool(
                         Math.max(
                                 1,
                                 Math.min(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
index ec5c69ad477..a95f5e76770 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
@@ -40,7 +40,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
 
 /** Factory for {@link ResultPartition} to use in {@link NettyShuffleEnvironment}. */
 public class ResultPartitionFactory {
@@ -55,7 +55,7 @@ public class ResultPartitionFactory {
 
     private final BatchShuffleReadBufferPool batchShuffleReadBufferPool;
 
-    private final ExecutorService batchShuffleReadIOExecutor;
+    private final ScheduledExecutorService batchShuffleReadIOExecutor;
 
     private final BoundedBlockingSubpartitionType blockingSubpartitionType;
 
@@ -84,7 +84,7 @@ public class ResultPartitionFactory {
             FileChannelManager channelManager,
             BufferPoolFactory bufferPoolFactory,
             BatchShuffleReadBufferPool batchShuffleReadBufferPool,
-            ExecutorService batchShuffleReadIOExecutor,
+            ScheduledExecutorService batchShuffleReadIOExecutor,
             BoundedBlockingSubpartitionType blockingSubpartitionType,
             int configuredNetworkBuffersPerChannel,
             int floatingNetworkBuffersPerGate,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
index 7e8debb8153..eaef4110edc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
@@ -48,7 +48,7 @@ import java.util.PriorityQueue;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeoutException;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -63,7 +63,7 @@ public class HsFileDataManager implements Runnable, BufferRecycler {
     private static final Logger LOG = LoggerFactory.getLogger(HsFileDataManager.class);
 
     /** Executor to run the shuffle data reading task. */
-    private final Executor ioExecutor;
+    private final ScheduledExecutorService ioExecutor;
 
     /** Maximum number of buffers can be allocated by this partition reader. */
     private final int maxRequestedBuffers;
@@ -121,7 +121,7 @@ public class HsFileDataManager implements Runnable, BufferRecycler {
 
     public HsFileDataManager(
             BatchShuffleReadBufferPool bufferPool,
-            Executor ioExecutor,
+            ScheduledExecutorService ioExecutor,
             HsFileDataIndex dataIndex,
             Path dataFilePath,
             HsSubpartitionFileReader.Factory fileReaderFactory,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
index 15242440ad1..308742909ea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
@@ -44,7 +44,7 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.file.Path;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -79,7 +79,7 @@ public class HsResultPartition extends ResultPartition {
             int numSubpartitions,
             int numTargetKeyGroups,
             BatchShuffleReadBufferPool readBufferPool,
-            Executor readIOExecutor,
+            ScheduledExecutorService readIOExecutor,
             ResultPartitionManager partitionManager,
             String dataFileBashPath,
             int networkBufferSize,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
index 2d2edbe36e9..5ef48002b03 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
@@ -24,12 +24,12 @@ import org.apache.flink.runtime.io.disk.NoOpFileChannelManager;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.util.concurrent.Executors;
 import org.apache.flink.util.function.SupplierWithException;
 
 import java.io.IOException;
 import java.util.Optional;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 
 /** Utility class to encapsulate the logic of building a {@link ResultPartition} instance. */
 public class ResultPartitionBuilder {
@@ -56,7 +56,8 @@ public class ResultPartitionBuilder {
     private BatchShuffleReadBufferPool batchShuffleReadBufferPool =
             new BatchShuffleReadBufferPool(64 * 32 * 1024, 32 * 1024);
 
-    private ExecutorService batchShuffleReadIOExecutor = Executors.newDirectExecutorService();
+    private ScheduledExecutorService batchShuffleReadIOExecutor =
+            Executors.newSingleThreadScheduledExecutor();
 
     private int networkBuffersPerChannel = 1;
 
@@ -145,7 +146,7 @@ public class ResultPartitionBuilder {
     }
 
     public ResultPartitionBuilder setBatchShuffleReadIOExecutor(
-            ExecutorService batchShuffleReadIOExecutor) {
+            ScheduledExecutorService batchShuffleReadIOExecutor) {
         this.batchShuffleReadIOExecutor = batchShuffleReadIOExecutor;
         return this;
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
index 6a22484f0b0..a7d1b04dacb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
@@ -27,13 +27,13 @@ import org.apache.flink.runtime.shuffle.PartitionDescriptorBuilder;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
 import org.apache.flink.util.TestLogger;
-import org.apache.flink.util.concurrent.Executors;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.util.Arrays;
+import java.util.concurrent.Executors;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -149,7 +149,7 @@ public class ResultPartitionFactoryTest extends TestLogger {
                         fileChannelManager,
                         new NetworkBufferPool(1, SEGMENT_SIZE),
                         new BatchShuffleReadBufferPool(10 * SEGMENT_SIZE, SEGMENT_SIZE),
-                        Executors.newDirectExecutorService(),
+                        Executors.newSingleThreadScheduledExecutor(),
                         BoundedBlockingSubpartitionType.AUTO,
                         1,
                         1,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java
index 4947f5e7776..cdceb338042 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java
@@ -20,12 +20,12 @@ package org.apache.flink.runtime.io.network.partition.hybrid;
 
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
 import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
 import org.apache.flink.util.TestLoggerExtension;
-import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
 import org.apache.flink.util.function.BiConsumerWithException;
 
 import org.junit.jupiter.api.AfterEach;
@@ -66,7 +66,7 @@ class HsFileDataManagerTest {
 
     private final byte[] dataBytes = new byte[BUFFER_SIZE];
 
-    private ManuallyTriggeredScheduledExecutor ioExecutor;
+    private ManuallyTriggeredScheduledExecutorService ioExecutor;
 
     private BatchShuffleReadBufferPool bufferPool;
 
@@ -85,7 +85,7 @@ class HsFileDataManagerTest {
         Random random = new Random();
         random.nextBytes(dataBytes);
         bufferPool = new BatchShuffleReadBufferPool(BUFFER_POOL_SIZE * BUFFER_SIZE, BUFFER_SIZE);
-        ioExecutor = new ManuallyTriggeredScheduledExecutor();
+        ioExecutor = new ManuallyTriggeredScheduledExecutorService();
         dataFilePath = Files.createFile(tempDir.resolve(".data"));
         dataFileChannel = openFileChannel(dataFilePath);
         factory = new TestingHsSubpartitionFileReader.Factory();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java
index 73826440553..b07bcc7e725 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java
@@ -59,8 +59,8 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Queue;
 import java.util.Random;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
 
@@ -85,7 +85,7 @@ class HsResultPartitionTest {
 
     private BatchShuffleReadBufferPool readBufferPool;
 
-    private ExecutorService readIOExecutor;
+    private ScheduledExecutorService readIOExecutor;
 
     private TaskIOMetricGroup taskIOMetricGroup;
 
@@ -97,7 +97,7 @@ class HsResultPartitionTest {
                 new FileChannelManagerImpl(new String[] {tempDataPath.toString()}, "testing");
         globalPool = new NetworkBufferPool(totalBuffers, bufferSize);
         readBufferPool = new BatchShuffleReadBufferPool(totalBytes, bufferSize);
-        readIOExecutor = Executors.newFixedThreadPool(numThreads);
+        readIOExecutor = Executors.newScheduledThreadPool(numThreads);
     }
 
     @AfterEach