You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2020/05/28 15:46:43 UTC

[flink] 07/07: [FLINK-17558][netty] Release partitions asynchronously

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c6c0839d40571bb9ad47b8dbbec0b0ea14615372
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Tue May 19 16:58:22 2020 +0200

    [FLINK-17558][netty] Release partitions asynchronously
---
 .../io/network/NettyShuffleEnvironment.java        | 15 ++++--
 .../io/network/NettyShuffleServiceFactory.java     | 31 +++++++++++--
 .../runtime/shuffle/ShuffleEnvironmentContext.java | 11 ++++-
 .../runtime/taskexecutor/TaskManagerRunner.java    |  7 ++-
 .../runtime/taskexecutor/TaskManagerServices.java  | 16 ++++---
 .../TaskManagerServicesConfiguration.java          | 16 ++++++-
 .../io/network/NettyShuffleEnvironmentBuilder.java | 21 ++++++++-
 .../io/network/NettyShuffleEnvironmentTest.java    | 26 +++++++++++
 .../TaskExecutorLocalStateStoresManagerTest.java   |  2 +-
 .../TaskExecutorPartitionLifecycleTest.java        | 53 ++++++++++++++++++++++
 10 files changed, 177 insertions(+), 21 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
index 2d2d6f3..5e7c119 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
@@ -56,6 +56,7 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
 
 import static org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.METRIC_GROUP_INPUT;
 import static org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.METRIC_GROUP_OUTPUT;
@@ -94,6 +95,8 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment<ResultPartiti
 
 	private final SingleInputGateFactory singleInputGateFactory;
 
+	private final Executor ioExecutor;
+
 	private boolean isClosed;
 
 	NettyShuffleEnvironment(
@@ -104,7 +107,8 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment<ResultPartiti
 			ResultPartitionManager resultPartitionManager,
 			FileChannelManager fileChannelManager,
 			ResultPartitionFactory resultPartitionFactory,
-			SingleInputGateFactory singleInputGateFactory) {
+			SingleInputGateFactory singleInputGateFactory,
+			Executor ioExecutor) {
 		this.taskExecutorResourceId = taskExecutorResourceId;
 		this.config = config;
 		this.networkBufferPool = networkBufferPool;
@@ -114,6 +118,7 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment<ResultPartiti
 		this.fileChannelManager = fileChannelManager;
 		this.resultPartitionFactory = resultPartitionFactory;
 		this.singleInputGateFactory = singleInputGateFactory;
+		this.ioExecutor = ioExecutor;
 		this.isClosed = false;
 	}
 
@@ -148,9 +153,11 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment<ResultPartiti
 
 	@Override
 	public void releasePartitionsLocally(Collection<ResultPartitionID> partitionIds) {
-		for (ResultPartitionID partitionId : partitionIds) {
-			resultPartitionManager.releasePartition(partitionId, null);
-		}
+		ioExecutor.execute(() -> {
+			for (ResultPartitionID partitionId : partitionIds) {
+				resultPartitionManager.releasePartition(partitionId, null);
+			}
+		});
 	}
 
 	/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
index afbaba2..f065122 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
@@ -38,6 +38,8 @@ import org.apache.flink.runtime.shuffle.ShuffleEnvironmentContext;
 import org.apache.flink.runtime.shuffle.ShuffleServiceFactory;
 import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
 
+import java.util.concurrent.Executor;
+
 import static org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.registerShuffleMetrics;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -65,7 +67,8 @@ public class NettyShuffleServiceFactory implements ShuffleServiceFactory<NettySh
 			networkConfig,
 			shuffleEnvironmentContext.getTaskExecutorResourceId(),
 			shuffleEnvironmentContext.getEventPublisher(),
-			shuffleEnvironmentContext.getParentMetricGroup());
+			shuffleEnvironmentContext.getParentMetricGroup(),
+			shuffleEnvironmentContext.getIoExecutor());
 	}
 
 	@VisibleForTesting
@@ -73,16 +76,33 @@ public class NettyShuffleServiceFactory implements ShuffleServiceFactory<NettySh
 			NettyShuffleEnvironmentConfiguration config,
 			ResourceID taskExecutorResourceId,
 			TaskEventPublisher taskEventPublisher,
-			MetricGroup metricGroup) {
+			MetricGroup metricGroup,
+			Executor ioExecutor) {
+		return createNettyShuffleEnvironment(
+			config,
+			taskExecutorResourceId,
+			taskEventPublisher,
+			new ResultPartitionManager(),
+			metricGroup,
+			ioExecutor);
+	}
+
+	@VisibleForTesting
+	static NettyShuffleEnvironment createNettyShuffleEnvironment(
+			NettyShuffleEnvironmentConfiguration config,
+			ResourceID taskExecutorResourceId,
+			TaskEventPublisher taskEventPublisher,
+			ResultPartitionManager resultPartitionManager,
+			MetricGroup metricGroup,
+			Executor ioExecutor) {
 		checkNotNull(config);
 		checkNotNull(taskExecutorResourceId);
 		checkNotNull(taskEventPublisher);
+		checkNotNull(resultPartitionManager);
 		checkNotNull(metricGroup);
 
 		NettyConfig nettyConfig = config.nettyConfig();
 
-		ResultPartitionManager resultPartitionManager = new ResultPartitionManager();
-
 		FileChannelManager fileChannelManager = new FileChannelManagerImpl(config.getTempDirs(), DIR_NAME_PREFIX);
 
 		ConnectionManager connectionManager = nettyConfig != null ?
@@ -125,6 +145,7 @@ public class NettyShuffleServiceFactory implements ShuffleServiceFactory<NettySh
 			resultPartitionManager,
 			fileChannelManager,
 			resultPartitionFactory,
-			singleInputGateFactory);
+			singleInputGateFactory,
+			ioExecutor);
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java
index 7863f18..3116372 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.io.network.TaskEventPublisher;
 
 import java.net.InetAddress;
+import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -40,6 +41,8 @@ public class ShuffleEnvironmentContext {
 	private final TaskEventPublisher eventPublisher;
 	private final MetricGroup parentMetricGroup;
 
+	private final Executor ioExecutor;
+
 	public ShuffleEnvironmentContext(
 			Configuration configuration,
 			ResourceID taskExecutorResourceId,
@@ -47,7 +50,8 @@ public class ShuffleEnvironmentContext {
 			boolean localCommunicationOnly,
 			InetAddress hostAddress,
 			TaskEventPublisher eventPublisher,
-			MetricGroup parentMetricGroup) {
+			MetricGroup parentMetricGroup,
+			Executor ioExecutor) {
 		this.configuration = checkNotNull(configuration);
 		this.taskExecutorResourceId = checkNotNull(taskExecutorResourceId);
 		this.networkMemorySize = networkMemorySize;
@@ -55,6 +59,7 @@ public class ShuffleEnvironmentContext {
 		this.hostAddress = checkNotNull(hostAddress);
 		this.eventPublisher = checkNotNull(eventPublisher);
 		this.parentMetricGroup = checkNotNull(parentMetricGroup);
+		this.ioExecutor = ioExecutor;
 	}
 
 	public Configuration getConfiguration() {
@@ -84,4 +89,8 @@ public class ShuffleEnvironmentContext {
 	public MetricGroup getParentMetricGroup() {
 		return parentMetricGroup;
 	}
+
+	public Executor getIoExecutor() {
+		return ioExecutor;
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 019d186..7f66d03 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -31,6 +31,7 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobCacheService;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.entrypoint.ClusterConfiguration;
@@ -373,10 +374,14 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync
 			resourceID,
 			taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval());
 
+		final ExecutorService ioExecutor = Executors.newCachedThreadPool(
+			taskManagerServicesConfiguration.getNumIoThreads(),
+			new ExecutorThreadFactory("flink-taskexecutor-io"));
+
 		TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(
 			taskManagerServicesConfiguration,
 			taskManagerMetricGroup.f1,
-			rpcService.getExecutor()); // TODO replace this later with some dedicated executor for io.
+			ioExecutor);
 
 		TaskManagerConfiguration taskManagerConfiguration =
 			TaskManagerConfiguration.fromConfiguration(configuration, taskExecutorResourceSpec);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index 04c3b29..a443fb4 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -45,6 +45,7 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 /**
@@ -208,14 +209,14 @@ public class TaskManagerServices {
 	 *
 	 * @param taskManagerServicesConfiguration task manager configuration
 	 * @param taskManagerMetricGroup metric group of the task manager
-	 * @param taskIOExecutor executor for async IO operations
+	 * @param ioExecutor executor for async IO operations
 	 * @return task manager components
 	 * @throws Exception
 	 */
 	public static TaskManagerServices fromConfiguration(
 			TaskManagerServicesConfiguration taskManagerServicesConfiguration,
 			MetricGroup taskManagerMetricGroup,
-			Executor taskIOExecutor) throws Exception {
+			ExecutorService ioExecutor) throws Exception {
 
 		// pre-start checks
 		checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths());
@@ -228,7 +229,8 @@ public class TaskManagerServices {
 		final ShuffleEnvironment<?, ?> shuffleEnvironment = createShuffleEnvironment(
 			taskManagerServicesConfiguration,
 			taskEventDispatcher,
-			taskManagerMetricGroup);
+			taskManagerMetricGroup,
+			ioExecutor);
 		final int dataPort = shuffleEnvironment.start();
 
 		final KvStateService kvStateService = KvStateService.fromConfiguration(taskManagerServicesConfiguration);
@@ -262,7 +264,7 @@ public class TaskManagerServices {
 		final TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager(
 			taskManagerServicesConfiguration.isLocalRecoveryEnabled(),
 			stateRootDirectoryFiles,
-			taskIOExecutor);
+			ioExecutor);
 
 		return new TaskManagerServices(
 			taskManagerLocation,
@@ -297,7 +299,8 @@ public class TaskManagerServices {
 	private static ShuffleEnvironment<?, ?> createShuffleEnvironment(
 			TaskManagerServicesConfiguration taskManagerServicesConfiguration,
 			TaskEventDispatcher taskEventDispatcher,
-			MetricGroup taskManagerMetricGroup) throws FlinkException {
+			MetricGroup taskManagerMetricGroup,
+			Executor ioExecutor) throws FlinkException {
 
 		final ShuffleEnvironmentContext shuffleEnvironmentContext = new ShuffleEnvironmentContext(
 			taskManagerServicesConfiguration.getConfiguration(),
@@ -306,7 +309,8 @@ public class TaskManagerServices {
 			taskManagerServicesConfiguration.isLocalCommunicationOnly(),
 			taskManagerServicesConfiguration.getTaskManagerAddress(),
 			taskEventDispatcher,
-			taskManagerMetricGroup);
+			taskManagerMetricGroup,
+			ioExecutor);
 
 		return ShuffleServiceLoader
 			.loadShuffleServiceFactory(taskManagerServicesConfiguration.getConfiguration())
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index 5ec28e0..480394c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration;
+import org.apache.flink.runtime.util.ClusterEntrypointUtils;
 import org.apache.flink.runtime.util.ConfigurationParserUtils;
 
 import javax.annotation.Nullable;
@@ -71,6 +72,8 @@ public class TaskManagerServicesConfiguration {
 
 	private final TaskExecutorResourceSpec taskExecutorResourceSpec;
 
+	private final int numIoThreads;
+
 	public TaskManagerServicesConfiguration(
 			Configuration configuration,
 			ResourceID resourceID,
@@ -85,7 +88,8 @@ public class TaskManagerServicesConfiguration {
 			TaskExecutorResourceSpec taskExecutorResourceSpec,
 			long timerServiceShutdownTimeout,
 			RetryingRegistrationConfiguration retryingRegistrationConfiguration,
-			Optional<Time> systemResourceMetricsProbingInterval) {
+			Optional<Time> systemResourceMetricsProbingInterval,
+			int numIoThreads) {
 		this.configuration = checkNotNull(configuration);
 		this.resourceID = checkNotNull(resourceID);
 
@@ -100,6 +104,7 @@ public class TaskManagerServicesConfiguration {
 		this.pageSize = pageSize;
 
 		this.taskExecutorResourceSpec = taskExecutorResourceSpec;
+		this.numIoThreads = numIoThreads;
 
 		checkArgument(timerServiceShutdownTimeout >= 0L, "The timer " +
 			"service shutdown timeout must be greater or equal to 0.");
@@ -178,6 +183,10 @@ public class TaskManagerServicesConfiguration {
 		return retryingRegistrationConfiguration;
 	}
 
+	public int getNumIoThreads() {
+		return numIoThreads;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Parsing of Flink configuration
 	// --------------------------------------------------------------------------------------------
@@ -215,6 +224,8 @@ public class TaskManagerServicesConfiguration {
 
 		final RetryingRegistrationConfiguration retryingRegistrationConfiguration = RetryingRegistrationConfiguration.fromConfiguration(configuration);
 
+		final int numIoThreads = ClusterEntrypointUtils.getPoolSize(configuration);
+
 		return new TaskManagerServicesConfiguration(
 			configuration,
 			resourceID,
@@ -229,6 +240,7 @@ public class TaskManagerServicesConfiguration {
 			taskExecutorResourceSpec,
 			timerServiceShutdownTimeout,
 			retryingRegistrationConfiguration,
-			ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
+			ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration),
+			numIoThreads);
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
index ec29f82..26949cf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
@@ -20,13 +20,16 @@ package org.apache.flink.runtime.io.network;
 
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionType;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 
 import java.time.Duration;
+import java.util.concurrent.Executor;
 
 /**
  * Builder for the {@link NettyShuffleEnvironment}.
@@ -59,6 +62,10 @@ public class NettyShuffleEnvironmentBuilder {
 
 	private MetricGroup metricGroup = UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup();
 
+	private ResultPartitionManager resultPartitionManager = new ResultPartitionManager();
+
+	private Executor ioExecutor = Executors.directExecutor();
+
 	public NettyShuffleEnvironmentBuilder setTaskManagerLocation(ResourceID taskManagerLocation) {
 		this.taskManagerLocation = taskManagerLocation;
 		return this;
@@ -109,6 +116,16 @@ public class NettyShuffleEnvironmentBuilder {
 		return this;
 	}
 
+	public NettyShuffleEnvironmentBuilder setResultPartitionManager(ResultPartitionManager resultPartitionManager) {
+		this.resultPartitionManager = resultPartitionManager;
+		return this;
+	}
+
+	public NettyShuffleEnvironmentBuilder setIoExecutor(Executor ioExecutor) {
+		this.ioExecutor = ioExecutor;
+		return this;
+	}
+
 	public NettyShuffleEnvironment build() {
 		return NettyShuffleServiceFactory.createNettyShuffleEnvironment(
 			new NettyShuffleEnvironmentConfiguration(
@@ -128,6 +145,8 @@ public class NettyShuffleEnvironmentBuilder {
 				compressionCodec),
 			taskManagerLocation,
 			new TaskEventDispatcher(),
-			metricGroup);
+			resultPartitionManager,
+			metricGroup,
+			ioExecutor);
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
index b1a23a4..c0b094f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
@@ -20,10 +20,13 @@ package org.apache.flink.runtime.io.network;
 
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.core.memory.MemorySegmentProvider;
+import org.apache.flink.core.testutils.BlockerSync;
 import org.apache.flink.runtime.io.disk.FileChannelManager;
 import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
@@ -40,6 +43,8 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.Executors;
 
 import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createDummyConnectionManager;
 import static org.apache.flink.runtime.io.network.partition.PartitionTestUtils.createPartition;
@@ -99,6 +104,27 @@ public class NettyShuffleEnvironmentTest extends TestLogger {
 		testRegisterTaskWithLimitedBuffers(bufferCount);
 	}
 
+	@Test
+	public void testSlowIODoesNotBlockRelease() throws Exception {
+		BlockerSync sync = new BlockerSync();
+		ResultPartitionManager blockingResultPartitionManager = new ResultPartitionManager() {
+			@Override
+			public void releasePartition(ResultPartitionID partitionId, Throwable cause) {
+				sync.blockNonInterruptible();
+				super.releasePartition(partitionId, cause);
+			}
+		};
+
+		NettyShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder()
+			.setResultPartitionManager(blockingResultPartitionManager)
+			.setIoExecutor(Executors.newFixedThreadPool(1))
+			.build();
+
+		shuffleEnvironment.releasePartitionsLocally(Collections.singleton(new ResultPartitionID()));
+		sync.awaitBlocker();
+		sync.releaseBlocker();
+	}
+
 	private void testRegisterTaskWithLimitedBuffers(int bufferPoolSize) throws Exception {
 		final NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder()
 			.setNumNetworkBuffers(bufferPoolSize)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
index 49d6fb4..bb0079c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
@@ -226,6 +226,6 @@ public class TaskExecutorLocalStateStoresManagerTest extends TestLogger {
 		return TaskManagerServices.fromConfiguration(
 			config,
 			UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-			Executors.directExecutor());
+			Executors.newDirectExecutorService());
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
index 81b651f..66de0d8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
@@ -37,11 +37,15 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
 import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionInfo;
 import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTracker;
+import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTrackerImpl;
 import org.apache.flink.runtime.io.network.partition.TestingTaskExecutorPartitionTracker;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
@@ -61,6 +65,7 @@ import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils;
 import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.testutils.executor.TestExecutorResource;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.function.TriConsumer;
@@ -69,6 +74,7 @@ import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -81,6 +87,7 @@ import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.StreamSupport;
 
@@ -107,6 +114,10 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 	@Rule
 	public final TemporaryFolder tmp = new TemporaryFolder();
 
+	@ClassRule
+	public static final TestExecutorResource TEST_EXECUTOR_SERVICE_RESOURCE =
+		new TestExecutorResource(() -> java.util.concurrent.Executors.newFixedThreadPool(1));
+
 	@Before
 	public void setup() {
 		haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetriever);
@@ -243,6 +254,48 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 		);
 	}
 
+	@Test
+	public void testBlockingLocalPartitionReleaseDoesNotBlockTaskExecutor() throws Exception {
+		BlockerSync sync = new BlockerSync();
+		ResultPartitionManager blockingResultPartitionManager = new ResultPartitionManager() {
+			@Override
+			public void releasePartition(ResultPartitionID partitionId, Throwable cause) {
+				sync.blockNonInterruptible();
+				super.releasePartition(partitionId, cause);
+			}
+		};
+
+		NettyShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder()
+			.setResultPartitionManager(blockingResultPartitionManager)
+			.setIoExecutor(TEST_EXECUTOR_SERVICE_RESOURCE.getExecutor())
+			.build();
+
+		final CompletableFuture<ResultPartitionID> startTrackingFuture = new CompletableFuture<>();
+		final TaskExecutorPartitionTracker partitionTracker = new TaskExecutorPartitionTrackerImpl(shuffleEnvironment) {
+			@Override
+			public void startTrackingPartition(JobID producingJobId, TaskExecutorPartitionInfo partitionInfo) {
+				super.startTrackingPartition(producingJobId, partitionInfo);
+				startTrackingFuture.complete(partitionInfo.getResultPartitionId());
+			}
+		};
+
+		try {
+			internalTestPartitionRelease(
+				partitionTracker,
+				shuffleEnvironment,
+				startTrackingFuture,
+				(jobId, partitionId, taskExecutor, taskExecutorGateway) -> {
+					taskExecutorGateway.releaseOrPromotePartitions(jobId, Collections.singleton(partitionId), Collections.emptySet());
+
+					// execute some operation to check whether the TaskExecutor is blocked
+					taskExecutorGateway.canBeReleased().get(5, TimeUnit.SECONDS);
+				}
+			);
+		} finally {
+			sync.releaseBlocker();
+		}
+	}
+
 	private void testPartitionRelease(PartitionTrackerSetup partitionTrackerSetup, TestAction testAction) throws Exception {
 		final TestingTaskExecutorPartitionTracker partitionTracker = new TestingTaskExecutorPartitionTracker();
 		final CompletableFuture<ResultPartitionID> startTrackingFuture = new CompletableFuture<>();