You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2020/05/28 15:46:43 UTC
[flink] 07/07: [FLINK-17558][netty] Release partitions
asynchronously
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
commit c6c0839d40571bb9ad47b8dbbec0b0ea14615372
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Tue May 19 16:58:22 2020 +0200
[FLINK-17558][netty] Release partitions asynchronously
---
.../io/network/NettyShuffleEnvironment.java | 15 ++++--
.../io/network/NettyShuffleServiceFactory.java | 31 +++++++++++--
.../runtime/shuffle/ShuffleEnvironmentContext.java | 11 ++++-
.../runtime/taskexecutor/TaskManagerRunner.java | 7 ++-
.../runtime/taskexecutor/TaskManagerServices.java | 16 ++++---
.../TaskManagerServicesConfiguration.java | 16 ++++++-
.../io/network/NettyShuffleEnvironmentBuilder.java | 21 ++++++++-
.../io/network/NettyShuffleEnvironmentTest.java | 26 +++++++++++
.../TaskExecutorLocalStateStoresManagerTest.java | 2 +-
.../TaskExecutorPartitionLifecycleTest.java | 53 ++++++++++++++++++++++
10 files changed, 177 insertions(+), 21 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
index 2d2d6f3..5e7c119 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
@@ -56,6 +56,7 @@ import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
import static org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.METRIC_GROUP_INPUT;
import static org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.METRIC_GROUP_OUTPUT;
@@ -94,6 +95,8 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment<ResultPartiti
private final SingleInputGateFactory singleInputGateFactory;
+ private final Executor ioExecutor;
+
private boolean isClosed;
NettyShuffleEnvironment(
@@ -104,7 +107,8 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment<ResultPartiti
ResultPartitionManager resultPartitionManager,
FileChannelManager fileChannelManager,
ResultPartitionFactory resultPartitionFactory,
- SingleInputGateFactory singleInputGateFactory) {
+ SingleInputGateFactory singleInputGateFactory,
+ Executor ioExecutor) {
this.taskExecutorResourceId = taskExecutorResourceId;
this.config = config;
this.networkBufferPool = networkBufferPool;
@@ -114,6 +118,7 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment<ResultPartiti
this.fileChannelManager = fileChannelManager;
this.resultPartitionFactory = resultPartitionFactory;
this.singleInputGateFactory = singleInputGateFactory;
+ this.ioExecutor = ioExecutor;
this.isClosed = false;
}
@@ -148,9 +153,11 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment<ResultPartiti
@Override
public void releasePartitionsLocally(Collection<ResultPartitionID> partitionIds) {
- for (ResultPartitionID partitionId : partitionIds) {
- resultPartitionManager.releasePartition(partitionId, null);
- }
+ ioExecutor.execute(() -> {
+ for (ResultPartitionID partitionId : partitionIds) {
+ resultPartitionManager.releasePartition(partitionId, null);
+ }
+ });
}
/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
index afbaba2..f065122 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
@@ -38,6 +38,8 @@ import org.apache.flink.runtime.shuffle.ShuffleEnvironmentContext;
import org.apache.flink.runtime.shuffle.ShuffleServiceFactory;
import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
+import java.util.concurrent.Executor;
+
import static org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.registerShuffleMetrics;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -65,7 +67,8 @@ public class NettyShuffleServiceFactory implements ShuffleServiceFactory<NettySh
networkConfig,
shuffleEnvironmentContext.getTaskExecutorResourceId(),
shuffleEnvironmentContext.getEventPublisher(),
- shuffleEnvironmentContext.getParentMetricGroup());
+ shuffleEnvironmentContext.getParentMetricGroup(),
+ shuffleEnvironmentContext.getIoExecutor());
}
@VisibleForTesting
@@ -73,16 +76,33 @@ public class NettyShuffleServiceFactory implements ShuffleServiceFactory<NettySh
NettyShuffleEnvironmentConfiguration config,
ResourceID taskExecutorResourceId,
TaskEventPublisher taskEventPublisher,
- MetricGroup metricGroup) {
+ MetricGroup metricGroup,
+ Executor ioExecutor) {
+ return createNettyShuffleEnvironment(
+ config,
+ taskExecutorResourceId,
+ taskEventPublisher,
+ new ResultPartitionManager(),
+ metricGroup,
+ ioExecutor);
+ }
+
+ @VisibleForTesting
+ static NettyShuffleEnvironment createNettyShuffleEnvironment(
+ NettyShuffleEnvironmentConfiguration config,
+ ResourceID taskExecutorResourceId,
+ TaskEventPublisher taskEventPublisher,
+ ResultPartitionManager resultPartitionManager,
+ MetricGroup metricGroup,
+ Executor ioExecutor) {
checkNotNull(config);
checkNotNull(taskExecutorResourceId);
checkNotNull(taskEventPublisher);
+ checkNotNull(resultPartitionManager);
checkNotNull(metricGroup);
NettyConfig nettyConfig = config.nettyConfig();
- ResultPartitionManager resultPartitionManager = new ResultPartitionManager();
-
FileChannelManager fileChannelManager = new FileChannelManagerImpl(config.getTempDirs(), DIR_NAME_PREFIX);
ConnectionManager connectionManager = nettyConfig != null ?
@@ -125,6 +145,7 @@ public class NettyShuffleServiceFactory implements ShuffleServiceFactory<NettySh
resultPartitionManager,
fileChannelManager,
resultPartitionFactory,
- singleInputGateFactory);
+ singleInputGateFactory,
+ ioExecutor);
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java
index 7863f18..3116372 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.io.network.TaskEventPublisher;
import java.net.InetAddress;
+import java.util.concurrent.Executor;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -40,6 +41,8 @@ public class ShuffleEnvironmentContext {
private final TaskEventPublisher eventPublisher;
private final MetricGroup parentMetricGroup;
+ private final Executor ioExecutor;
+
public ShuffleEnvironmentContext(
Configuration configuration,
ResourceID taskExecutorResourceId,
@@ -47,7 +50,8 @@ public class ShuffleEnvironmentContext {
boolean localCommunicationOnly,
InetAddress hostAddress,
TaskEventPublisher eventPublisher,
- MetricGroup parentMetricGroup) {
+ MetricGroup parentMetricGroup,
+ Executor ioExecutor) {
this.configuration = checkNotNull(configuration);
this.taskExecutorResourceId = checkNotNull(taskExecutorResourceId);
this.networkMemorySize = networkMemorySize;
@@ -55,6 +59,7 @@ public class ShuffleEnvironmentContext {
this.hostAddress = checkNotNull(hostAddress);
this.eventPublisher = checkNotNull(eventPublisher);
this.parentMetricGroup = checkNotNull(parentMetricGroup);
+ this.ioExecutor = ioExecutor;
}
public Configuration getConfiguration() {
@@ -84,4 +89,8 @@ public class ShuffleEnvironmentContext {
public MetricGroup getParentMetricGroup() {
return parentMetricGroup;
}
+
+ public Executor getIoExecutor() {
+ return ioExecutor;
+ }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 019d186..7f66d03 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -31,6 +31,7 @@ import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.entrypoint.ClusterConfiguration;
@@ -373,10 +374,14 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync
resourceID,
taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval());
+ final ExecutorService ioExecutor = Executors.newCachedThreadPool(
+ taskManagerServicesConfiguration.getNumIoThreads(),
+ new ExecutorThreadFactory("flink-taskexecutor-io"));
+
TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(
taskManagerServicesConfiguration,
taskManagerMetricGroup.f1,
- rpcService.getExecutor()); // TODO replace this later with some dedicated executor for io.
+ ioExecutor);
TaskManagerConfiguration taskManagerConfiguration =
TaskManagerConfiguration.fromConfiguration(configuration, taskExecutorResourceSpec);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index 04c3b29..a443fb4 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -45,6 +45,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
/**
@@ -208,14 +209,14 @@ public class TaskManagerServices {
*
* @param taskManagerServicesConfiguration task manager configuration
* @param taskManagerMetricGroup metric group of the task manager
- * @param taskIOExecutor executor for async IO operations
+ * @param ioExecutor executor for async IO operations
* @return task manager components
* @throws Exception
*/
public static TaskManagerServices fromConfiguration(
TaskManagerServicesConfiguration taskManagerServicesConfiguration,
MetricGroup taskManagerMetricGroup,
- Executor taskIOExecutor) throws Exception {
+ ExecutorService ioExecutor) throws Exception {
// pre-start checks
checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths());
@@ -228,7 +229,8 @@ public class TaskManagerServices {
final ShuffleEnvironment<?, ?> shuffleEnvironment = createShuffleEnvironment(
taskManagerServicesConfiguration,
taskEventDispatcher,
- taskManagerMetricGroup);
+ taskManagerMetricGroup,
+ ioExecutor);
final int dataPort = shuffleEnvironment.start();
final KvStateService kvStateService = KvStateService.fromConfiguration(taskManagerServicesConfiguration);
@@ -262,7 +264,7 @@ public class TaskManagerServices {
final TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager(
taskManagerServicesConfiguration.isLocalRecoveryEnabled(),
stateRootDirectoryFiles,
- taskIOExecutor);
+ ioExecutor);
return new TaskManagerServices(
taskManagerLocation,
@@ -297,7 +299,8 @@ public class TaskManagerServices {
private static ShuffleEnvironment<?, ?> createShuffleEnvironment(
TaskManagerServicesConfiguration taskManagerServicesConfiguration,
TaskEventDispatcher taskEventDispatcher,
- MetricGroup taskManagerMetricGroup) throws FlinkException {
+ MetricGroup taskManagerMetricGroup,
+ Executor ioExecutor) throws FlinkException {
final ShuffleEnvironmentContext shuffleEnvironmentContext = new ShuffleEnvironmentContext(
taskManagerServicesConfiguration.getConfiguration(),
@@ -306,7 +309,8 @@ public class TaskManagerServices {
taskManagerServicesConfiguration.isLocalCommunicationOnly(),
taskManagerServicesConfiguration.getTaskManagerAddress(),
taskEventDispatcher,
- taskManagerMetricGroup);
+ taskManagerMetricGroup,
+ ioExecutor);
return ShuffleServiceLoader
.loadShuffleServiceFactory(taskManagerServicesConfiguration.getConfiguration())
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index 5ec28e0..480394c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.MemorySize;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration;
+import org.apache.flink.runtime.util.ClusterEntrypointUtils;
import org.apache.flink.runtime.util.ConfigurationParserUtils;
import javax.annotation.Nullable;
@@ -71,6 +72,8 @@ public class TaskManagerServicesConfiguration {
private final TaskExecutorResourceSpec taskExecutorResourceSpec;
+ private final int numIoThreads;
+
public TaskManagerServicesConfiguration(
Configuration configuration,
ResourceID resourceID,
@@ -85,7 +88,8 @@ public class TaskManagerServicesConfiguration {
TaskExecutorResourceSpec taskExecutorResourceSpec,
long timerServiceShutdownTimeout,
RetryingRegistrationConfiguration retryingRegistrationConfiguration,
- Optional<Time> systemResourceMetricsProbingInterval) {
+ Optional<Time> systemResourceMetricsProbingInterval,
+ int numIoThreads) {
this.configuration = checkNotNull(configuration);
this.resourceID = checkNotNull(resourceID);
@@ -100,6 +104,7 @@ public class TaskManagerServicesConfiguration {
this.pageSize = pageSize;
this.taskExecutorResourceSpec = taskExecutorResourceSpec;
+ this.numIoThreads = numIoThreads;
checkArgument(timerServiceShutdownTimeout >= 0L, "The timer " +
"service shutdown timeout must be greater or equal to 0.");
@@ -178,6 +183,10 @@ public class TaskManagerServicesConfiguration {
return retryingRegistrationConfiguration;
}
+ public int getNumIoThreads() {
+ return numIoThreads;
+ }
+
// --------------------------------------------------------------------------------------------
// Parsing of Flink configuration
// --------------------------------------------------------------------------------------------
@@ -215,6 +224,8 @@ public class TaskManagerServicesConfiguration {
final RetryingRegistrationConfiguration retryingRegistrationConfiguration = RetryingRegistrationConfiguration.fromConfiguration(configuration);
+ final int numIoThreads = ClusterEntrypointUtils.getPoolSize(configuration);
+
return new TaskManagerServicesConfiguration(
configuration,
resourceID,
@@ -229,6 +240,7 @@ public class TaskManagerServicesConfiguration {
taskExecutorResourceSpec,
timerServiceShutdownTimeout,
retryingRegistrationConfiguration,
- ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
+ ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration),
+ numIoThreads);
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
index ec29f82..26949cf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
@@ -20,13 +20,16 @@ package org.apache.flink.runtime.io.network;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionType;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
import org.apache.flink.runtime.util.EnvironmentInformation;
import java.time.Duration;
+import java.util.concurrent.Executor;
/**
* Builder for the {@link NettyShuffleEnvironment}.
@@ -59,6 +62,10 @@ public class NettyShuffleEnvironmentBuilder {
private MetricGroup metricGroup = UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup();
+ private ResultPartitionManager resultPartitionManager = new ResultPartitionManager();
+
+ private Executor ioExecutor = Executors.directExecutor();
+
public NettyShuffleEnvironmentBuilder setTaskManagerLocation(ResourceID taskManagerLocation) {
this.taskManagerLocation = taskManagerLocation;
return this;
@@ -109,6 +116,16 @@ public class NettyShuffleEnvironmentBuilder {
return this;
}
+ public NettyShuffleEnvironmentBuilder setResultPartitionManager(ResultPartitionManager resultPartitionManager) {
+ this.resultPartitionManager = resultPartitionManager;
+ return this;
+ }
+
+ public NettyShuffleEnvironmentBuilder setIoExecutor(Executor ioExecutor) {
+ this.ioExecutor = ioExecutor;
+ return this;
+ }
+
public NettyShuffleEnvironment build() {
return NettyShuffleServiceFactory.createNettyShuffleEnvironment(
new NettyShuffleEnvironmentConfiguration(
@@ -128,6 +145,8 @@ public class NettyShuffleEnvironmentBuilder {
compressionCodec),
taskManagerLocation,
new TaskEventDispatcher(),
- metricGroup);
+ resultPartitionManager,
+ metricGroup,
+ ioExecutor);
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
index b1a23a4..c0b094f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
@@ -20,10 +20,13 @@ package org.apache.flink.runtime.io.network;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.core.memory.MemorySegmentProvider;
+import org.apache.flink.core.testutils.BlockerSync;
import org.apache.flink.runtime.io.disk.FileChannelManager;
import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
@@ -40,6 +43,8 @@ import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.Executors;
import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createDummyConnectionManager;
import static org.apache.flink.runtime.io.network.partition.PartitionTestUtils.createPartition;
@@ -99,6 +104,27 @@ public class NettyShuffleEnvironmentTest extends TestLogger {
testRegisterTaskWithLimitedBuffers(bufferCount);
}
+ @Test
+ public void testSlowIODoesNotBlockRelease() throws Exception {
+ BlockerSync sync = new BlockerSync();
+ ResultPartitionManager blockingResultPartitionManager = new ResultPartitionManager() {
+ @Override
+ public void releasePartition(ResultPartitionID partitionId, Throwable cause) {
+ sync.blockNonInterruptible();
+ super.releasePartition(partitionId, cause);
+ }
+ };
+
+ NettyShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder()
+ .setResultPartitionManager(blockingResultPartitionManager)
+ .setIoExecutor(Executors.newFixedThreadPool(1))
+ .build();
+
+ shuffleEnvironment.releasePartitionsLocally(Collections.singleton(new ResultPartitionID()));
+ sync.awaitBlocker();
+ sync.releaseBlocker();
+ }
+
private void testRegisterTaskWithLimitedBuffers(int bufferPoolSize) throws Exception {
final NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder()
.setNumNetworkBuffers(bufferPoolSize)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
index 49d6fb4..bb0079c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
@@ -226,6 +226,6 @@ public class TaskExecutorLocalStateStoresManagerTest extends TestLogger {
return TaskManagerServices.fromConfiguration(
config,
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
- Executors.directExecutor());
+ Executors.newDirectExecutorService());
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
index 81b651f..66de0d8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
@@ -37,11 +37,15 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionInfo;
import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTracker;
+import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTrackerImpl;
import org.apache.flink.runtime.io.network.partition.TestingTaskExecutorPartitionTracker;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
@@ -61,6 +65,7 @@ import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils;
import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.TriConsumer;
@@ -69,6 +74,7 @@ import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -81,6 +87,7 @@ import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.StreamSupport;
@@ -107,6 +114,10 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
@Rule
public final TemporaryFolder tmp = new TemporaryFolder();
+ @ClassRule
+ public static final TestExecutorResource TEST_EXECUTOR_SERVICE_RESOURCE =
+ new TestExecutorResource(() -> java.util.concurrent.Executors.newFixedThreadPool(1));
+
@Before
public void setup() {
haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetriever);
@@ -243,6 +254,48 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
);
}
+ @Test
+ public void testBlockingLocalPartitionReleaseDoesNotBlockTaskExecutor() throws Exception {
+ BlockerSync sync = new BlockerSync();
+ ResultPartitionManager blockingResultPartitionManager = new ResultPartitionManager() {
+ @Override
+ public void releasePartition(ResultPartitionID partitionId, Throwable cause) {
+ sync.blockNonInterruptible();
+ super.releasePartition(partitionId, cause);
+ }
+ };
+
+ NettyShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder()
+ .setResultPartitionManager(blockingResultPartitionManager)
+ .setIoExecutor(TEST_EXECUTOR_SERVICE_RESOURCE.getExecutor())
+ .build();
+
+ final CompletableFuture<ResultPartitionID> startTrackingFuture = new CompletableFuture<>();
+ final TaskExecutorPartitionTracker partitionTracker = new TaskExecutorPartitionTrackerImpl(shuffleEnvironment) {
+ @Override
+ public void startTrackingPartition(JobID producingJobId, TaskExecutorPartitionInfo partitionInfo) {
+ super.startTrackingPartition(producingJobId, partitionInfo);
+ startTrackingFuture.complete(partitionInfo.getResultPartitionId());
+ }
+ };
+
+ try {
+ internalTestPartitionRelease(
+ partitionTracker,
+ shuffleEnvironment,
+ startTrackingFuture,
+ (jobId, partitionId, taskExecutor, taskExecutorGateway) -> {
+ taskExecutorGateway.releaseOrPromotePartitions(jobId, Collections.singleton(partitionId), Collections.emptySet());
+
+ // execute some operation to check whether the TaskExecutor is blocked
+ taskExecutorGateway.canBeReleased().get(5, TimeUnit.SECONDS);
+ }
+ );
+ } finally {
+ sync.releaseBlocker();
+ }
+ }
+
private void testPartitionRelease(PartitionTrackerSetup partitionTrackerSetup, TestAction testAction) throws Exception {
final TestingTaskExecutorPartitionTracker partitionTracker = new TestingTaskExecutorPartitionTracker();
final CompletableFuture<ResultPartitionID> startTrackingFuture = new CompletableFuture<>();