You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2020/05/28 16:29:45 UTC

[flink] branch release-1.11 updated (00ca13b -> 79cb1a1)

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 00ca13b  [FLINK-17882][table-common] Check for self references in structured types
     new 49fedef  [hotfix][tests] Remove unused TestingScheduledExecutor
     new b8ccab2  [hotfix][tests] Shutdown TaskmanagerServices
     new 6bc156b  [FLINK-17558][tests] Add TestExecutorResource
     new c40a541  [FLINK-17558][runtime] Add Executors#newCachedThreadPool
     new 7fdd488  [FLINK-17558][tests] Simplify partition tracker setup
     new a8145e5  [FLINK-17558][tests] Extract ShuffleEnvironment/PartitionTracker setup
     new 79cb1a1  [FLINK-17558][netty] Release partitions asynchronously

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/flink/runtime/concurrent/Executors.java |  27 +++++
 .../io/network/NettyShuffleEnvironment.java        |  15 ++-
 .../io/network/NettyShuffleServiceFactory.java     |  31 ++++-
 .../runtime/shuffle/ShuffleEnvironmentContext.java |  11 +-
 .../runtime/taskexecutor/TaskManagerRunner.java    |   7 +-
 .../runtime/taskexecutor/TaskManagerServices.java  |  19 ++-
 .../TaskManagerServicesConfiguration.java          |  18 ++-
 .../flink/runtime/concurrent/ExecutorsTest.java    |  63 ++++++++++
 .../io/network/NettyShuffleEnvironmentBuilder.java |  21 +++-
 .../io/network/NettyShuffleEnvironmentTest.java    |  26 +++++
 .../TaskExecutorLocalStateStoresManagerTest.java   |  58 +++++----
 .../TaskExecutorPartitionLifecycleTest.java        | 129 +++++++++++++++------
 .../runtime/util/TestingScheduledExecutor.java     |  62 ----------
 .../testutils/executor/TestExecutorResource.java   |  42 ++++---
 14 files changed, 363 insertions(+), 166 deletions(-)
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ExecutorsTest.java
 delete mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingScheduledExecutor.java
 copy flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDataTransfer.java => flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/executor/TestExecutorResource.java (51%)


[flink] 01/07: [hotfix][tests] Remove unused TestingScheduledExecutor

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 49fedef6b7a590311023fb481ecea5db48a84391
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed May 27 12:33:31 2020 +0200

    [hotfix][tests] Remove unused TestingScheduledExecutor
---
 .../runtime/util/TestingScheduledExecutor.java     | 62 ----------------------
 1 file changed, 62 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingScheduledExecutor.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingScheduledExecutor.java
deleted file mode 100644
index d9cfb11..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingScheduledExecutor.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util;
-
-import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
-import org.apache.flink.util.ExecutorUtils;
-
-import org.junit.rules.ExternalResource;
-
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Provide an automatically shut down scheduled executor for testing.
- */
-public class TestingScheduledExecutor extends ExternalResource {
-
-	private long shutdownTimeoutMillis;
-	private ScheduledExecutor scheduledExecutor;
-	private ScheduledExecutorService innerExecutorService;
-
-	public TestingScheduledExecutor() {
-		this(500L);
-	}
-
-	public TestingScheduledExecutor(long shutdownTimeoutMillis) {
-		this.shutdownTimeoutMillis = shutdownTimeoutMillis;
-	}
-
-	@Override
-	public void before() {
-		this.innerExecutorService = Executors.newSingleThreadScheduledExecutor();
-		this.scheduledExecutor = new ScheduledExecutorServiceAdapter(innerExecutorService);
-	}
-
-	@Override
-	protected void after() {
-		ExecutorUtils.gracefulShutdown(shutdownTimeoutMillis, TimeUnit.MILLISECONDS, innerExecutorService);
-	}
-
-	protected ScheduledExecutor getScheduledExecutor() {
-		return scheduledExecutor;
-	}
-}


[flink] 03/07: [FLINK-17558][tests] Add TestExecutorResource

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6bc156b7763b45dd832b1ceac100bc55ec2c0792
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu May 28 14:45:13 2020 +0200

    [FLINK-17558][tests] Add TestExecutorResource
---
 .../testutils/executor/TestExecutorResource.java   | 55 ++++++++++++++++++++++
 1 file changed, 55 insertions(+)

diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/executor/TestExecutorResource.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/executor/TestExecutorResource.java
new file mode 100644
index 0000000..b7619fc
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/executor/TestExecutorResource.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.executor;
+
+import org.junit.rules.ExternalResource;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
+
+/**
+ * Resource which starts/stops an {@link ExecutorService} for testing purposes.
+ */
+public class TestExecutorResource extends ExternalResource {
+
+	private final Supplier<ExecutorService> serviceFactory;
+
+	private ExecutorService executorService;
+
+	public TestExecutorResource(Supplier<ExecutorService> serviceFactory) {
+		this.serviceFactory = serviceFactory;
+	}
+
+	@Override
+	protected void before() throws Throwable {
+		executorService = serviceFactory.get();
+	}
+
+	public Executor getExecutor() {
+		// only return an Executor since this resource is in charge of the life cycle
+		return executorService;
+	}
+
+	@Override
+	protected void after() {
+		if (executorService != null) {
+			executorService.shutdown();
+		}
+	}
+}


[flink] 02/07: [hotfix][tests] Shutdown TaskmanagerServices

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b8ccab2f88c94613118570b14d75bdf5bf8f39e7
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed May 27 13:00:10 2020 +0200

    [hotfix][tests] Shutdown TaskmanagerServices
---
 .../TaskExecutorLocalStateStoresManagerTest.java   | 56 ++++++++++++----------
 1 file changed, 32 insertions(+), 24 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
index 15ee290..0560184 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
@@ -69,23 +69,27 @@ public class TaskExecutorLocalStateStoresManagerTest extends TestLogger {
 
 		TaskManagerServices taskManagerServices = createTaskManagerServices(createTaskManagerServiceConfiguration(config));
 
-		TaskExecutorLocalStateStoresManager taskStateManager = taskManagerServices.getTaskManagerStateStore();
-
-		// verify configured directories for local state
-		String[] split = rootDirString.split(",");
-		File[] rootDirectories = taskStateManager.getLocalStateRootDirectories();
-		for (int i = 0; i < split.length; ++i) {
-			Assert.assertEquals(
-				new File(split[i], TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT),
-				rootDirectories[i]);
-		}
+		try {
+			TaskExecutorLocalStateStoresManager taskStateManager = taskManagerServices.getTaskManagerStateStore();
+
+			// verify configured directories for local state
+			String[] split = rootDirString.split(",");
+			File[] rootDirectories = taskStateManager.getLocalStateRootDirectories();
+			for (int i = 0; i < split.length; ++i) {
+				Assert.assertEquals(
+					new File(split[i], TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT),
+					rootDirectories[i]);
+			}
 
-		// verify local recovery mode
-		Assert.assertTrue(taskStateManager.isLocalRecoveryEnabled());
+			// verify local recovery mode
+			Assert.assertTrue(taskStateManager.isLocalRecoveryEnabled());
 
-		Assert.assertEquals("localState", TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT);
-		for (File rootDirectory : rootDirectories) {
-			FileUtils.deleteFileOrDirectory(rootDirectory);
+			Assert.assertEquals("localState", TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT);
+			for (File rootDirectory : rootDirectories) {
+				FileUtils.deleteFileOrDirectory(rootDirectory);
+			}
+		} finally {
+			taskManagerServices.shutDown();
 		}
 	}
 
@@ -102,18 +106,22 @@ public class TaskExecutorLocalStateStoresManagerTest extends TestLogger {
 
 		TaskManagerServices taskManagerServices = createTaskManagerServices(taskManagerServicesConfiguration);
 
-		TaskExecutorLocalStateStoresManager taskStateManager = taskManagerServices.getTaskManagerStateStore();
+		try {
+			TaskExecutorLocalStateStoresManager taskStateManager = taskManagerServices.getTaskManagerStateStore();
 
-		String[] tmpDirPaths = taskManagerServicesConfiguration.getTmpDirPaths();
-		File[] localStateRootDirectories = taskStateManager.getLocalStateRootDirectories();
+			String[] tmpDirPaths = taskManagerServicesConfiguration.getTmpDirPaths();
+			File[] localStateRootDirectories = taskStateManager.getLocalStateRootDirectories();
 
-		for (int i = 0; i < tmpDirPaths.length; ++i) {
-			Assert.assertEquals(
-				new File(tmpDirPaths[i], TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT),
-				localStateRootDirectories[i]);
-		}
+			for (int i = 0; i < tmpDirPaths.length; ++i) {
+				Assert.assertEquals(
+					new File(tmpDirPaths[i], TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT),
+					localStateRootDirectories[i]);
+			}
 
-		Assert.assertFalse(taskStateManager.isLocalRecoveryEnabled());
+			Assert.assertFalse(taskStateManager.isLocalRecoveryEnabled());
+		} finally {
+			taskManagerServices.shutDown();
+		}
 	}
 
 	/**


[flink] 06/07: [FLINK-17558][tests] Extract ShuffleEnvironment/PartitionTracker setup

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a8145e547e3a760960b4fd17c532770f281193bc
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed May 20 14:13:23 2020 +0200

    [FLINK-17558][tests] Extract ShuffleEnvironment/PartitionTracker setup
---
 .../TaskExecutorPartitionLifecycleTest.java        | 27 +++++++++++++++-------
 1 file changed, 19 insertions(+), 8 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
index 9ca4397..b538b36 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
@@ -268,7 +268,25 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 		);
 	}
 
-	private <C> void testPartitionRelease(PartitionTrackerSetup<C> partitionTrackerSetup, TestAction<C> testAction) throws Exception {
+	private void testPartitionRelease(PartitionTrackerSetup partitionTrackerSetup, TestAction testAction) throws Exception {
+		final TestingTaskExecutorPartitionTracker partitionTracker = new TestingTaskExecutorPartitionTracker();
+		final CompletableFuture<ResultPartitionID> startTrackingFuture = new CompletableFuture<>();
+		partitionTracker.setStartTrackingPartitionsConsumer((jobId, partitionInfo) -> startTrackingFuture.complete(partitionInfo.getResultPartitionId()));
+		partitionTrackerSetup.accept(partitionTracker);
+
+		internalTestPartitionRelease(
+			partitionTracker,
+			new NettyShuffleEnvironmentBuilder().build(),
+			startTrackingFuture,
+			testAction
+		);
+	}
+
+	private void internalTestPartitionRelease(
+			TaskExecutorPartitionTracker partitionTracker,
+			ShuffleEnvironment<?, ?> shuffleEnvironment,
+			CompletableFuture<ResultPartitionID> startTrackingFuture,
+			TestAction testAction) throws Exception {
 
 		final ResultPartitionDeploymentDescriptor taskResultPartitionDescriptor =
 			PartitionTestUtils.createPartitionDeploymentDescriptor(ResultPartitionType.BLOCKING);
@@ -301,8 +319,6 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 			new File[]{tmp.newFolder()},
 			Executors.directExecutor());
 
-		final ShuffleEnvironment<?, ?> shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build();
-
 		final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
 			.setTaskSlotTable(taskSlotTable)
 			.setTaskStateManager(localStateStoresManager)
@@ -326,11 +342,6 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 			})
 			.build();
 
-		final TestingTaskExecutorPartitionTracker partitionTracker = new TestingTaskExecutorPartitionTracker();
-		final CompletableFuture<ResultPartitionID> startTrackingFuture = new CompletableFuture<>();
-		partitionTracker.setStartTrackingPartitionsConsumer((jobId, partitionInfo) -> startTrackingFuture.complete(partitionInfo.getResultPartitionId()));
-		partitionTrackerSetup.accept(partitionTracker);
-
 		final TestingTaskExecutor taskExecutor = createTestingTaskExecutor(taskManagerServices, partitionTracker);
 
 		final CompletableFuture<SlotReport> initialSlotReportFuture = new CompletableFuture<>();


[flink] 04/07: [FLINK-17558][runtime] Add Executors#newCachedThreadPool

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c40a5413b12c17490e24993ced4f8d282be3f955
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed May 27 12:59:28 2020 +0200

    [FLINK-17558][runtime] Add Executors#newCachedThreadPool
---
 .../apache/flink/runtime/concurrent/Executors.java | 27 ++++++++++
 .../flink/runtime/concurrent/ExecutorsTest.java    | 63 ++++++++++++++++++++++
 2 files changed, 90 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
index 41d9a32..c758752 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
@@ -18,8 +18,14 @@
 
 package org.apache.flink.runtime.concurrent;
 
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import scala.concurrent.ExecutionContext;
 
@@ -61,6 +67,27 @@ public class Executors {
 	}
 
 	/**
+	 * Returns a new cached thread pool with the desired maximum size.
+	 *
+	 * <p>This method is a variation of {@link java.util.concurrent.Executors#newFixedThreadPool(int, ThreadFactory)},
+	 * with the minimum pool size set to 0.
+	 * In that respect it is similar to {@link java.util.concurrent.Executors#newCachedThreadPool()}, but it uses a
+	 * {@link LinkedBlockingQueue} instead to allow tasks to be queued, instead of failing with an exception if the pool
+	 * is saturated.
+	 *
+	 * @see ExecutorThreadFactory
+	 * @param maxPoolSize maximum size of the thread pool
+	 * @param threadFactory thread factory to use
+	 * @return new cached thread pool
+	 */
+	public static ExecutorService newCachedThreadPool(int maxPoolSize, ThreadFactory threadFactory) {
+		return new ThreadPoolExecutor(0, maxPoolSize,
+			60L, TimeUnit.SECONDS,
+			new LinkedBlockingQueue<>(),
+			threadFactory);
+	}
+
+	/**
 	 * Direct execution context.
 	 */
 	private static class DirectExecutionContext implements ExecutionContext {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ExecutorsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ExecutorsTest.java
new file mode 100644
index 0000000..e3be776
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ExecutorsTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import org.apache.flink.core.testutils.BlockerSync;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.testutils.executor.TestExecutorResource;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * Tests for {@link Executors}.
+ */
+public class ExecutorsTest {
+
+	@Rule
+	public final TestExecutorResource executorResource = new TestExecutorResource(
+		() -> Executors.newCachedThreadPool(1, new ExecutorThreadFactory()));
+
+	/**
+	 * Tests that the {@link ExecutorService} returned by {@link Executors#newCachedThreadPool(int, ThreadFactory)}
+	 * allows tasks to be queued. In a prior implementation the executor used a synchronous queue, rejecting tasks with
+	 * an exception if no thread was available to process it.
+	 */
+	@Test
+	public void testNewCachedThreadPoolDoesNotRejectTasksExceedingActiveThreadCount() throws InterruptedException {
+		Executor executor = executorResource.getExecutor();
+
+		BlockerSync sync = new BlockerSync();
+		try {
+			// submit the first blocking task, which should block the single pool thread
+			executor.execute(sync::blockNonInterruptible);
+
+			// the thread is now blocked
+			sync.awaitBlocker();
+
+			// this task should not be rejected
+			executor.execute(sync::blockNonInterruptible);
+		} finally {
+			sync.releaseBlocker();
+		}
+	}
+}


[flink] 05/07: [FLINK-17558][tests] Simplify partition tracker setup

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7fdd48864dab360c6f7c01e5f422fb2e02d661b8
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed May 20 14:11:09 2020 +0200

    [FLINK-17558][tests] Simplify partition tracker setup
---
 .../TaskExecutorPartitionLifecycleTest.java        | 49 ++++++++--------------
 1 file changed, 18 insertions(+), 31 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
index 9cc5d69..9ca4397 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
@@ -212,13 +212,10 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 
 	@Test
 	public void testPartitionReleaseAfterJobMasterDisconnect() throws Exception {
+		final CompletableFuture<JobID> releasePartitionsForJobFuture = new CompletableFuture<>();
 		testPartitionRelease(
-			partitionTracker -> {
-				final CompletableFuture<JobID> releasePartitionsForJobFuture = new CompletableFuture<>();
-				partitionTracker.setStopTrackingAndReleaseAllPartitionsConsumer(releasePartitionsForJobFuture::complete);
-				return releasePartitionsForJobFuture;
-			},
-			(jobId, resultPartitionDeploymentDescriptor, taskExecutor, taskExecutorGateway, releasePartitionsForJobFuture) -> {
+			partitionTracker -> partitionTracker.setStopTrackingAndReleaseAllPartitionsConsumer(releasePartitionsForJobFuture::complete),
+			(jobId, resultPartitionDeploymentDescriptor, taskExecutor, taskExecutorGateway) -> {
 				taskExecutorGateway.disconnectJobManager(jobId, new Exception("test"));
 
 				assertThat(releasePartitionsForJobFuture.get(), equalTo(jobId));
@@ -228,13 +225,10 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 
 	@Test
 	public void testPartitionReleaseAfterReleaseCall() throws Exception {
+		final CompletableFuture<Collection<ResultPartitionID>> releasePartitionsFuture = new CompletableFuture<>();
 		testPartitionRelease(
-			partitionTracker -> {
-				final CompletableFuture<Collection<ResultPartitionID>> releasePartitionsFuture = new CompletableFuture<>();
-				partitionTracker.setStopTrackingAndReleasePartitionsConsumer(releasePartitionsFuture::complete);
-				return releasePartitionsFuture;
-			},
-			(jobId, resultPartitionDeploymentDescriptor, taskExecutor, taskExecutorGateway, releasePartitionsFuture) -> {
+			partitionTracker -> partitionTracker.setStopTrackingAndReleasePartitionsConsumer(releasePartitionsFuture::complete),
+			(jobId, resultPartitionDeploymentDescriptor, taskExecutor, taskExecutorGateway) -> {
 				final ResultPartitionID resultPartitionId = resultPartitionDeploymentDescriptor.getShuffleDescriptor().getResultPartitionID();
 
 				taskExecutorGateway.releaseOrPromotePartitions(jobId, Collections.singleton(resultPartitionId), Collections.emptySet());
@@ -246,13 +240,10 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 
 	@Test
 	public void testPartitionPromotion() throws Exception {
+		final CompletableFuture<Collection<ResultPartitionID>> promotePartitionsFuture = new CompletableFuture<>();
 		testPartitionRelease(
-			partitionTracker -> {
-				final CompletableFuture<Collection<ResultPartitionID>> promotePartitionsFuture = new CompletableFuture<>();
-				partitionTracker.setPromotePartitionsConsumer(promotePartitionsFuture::complete);
-				return promotePartitionsFuture;
-			},
-			(jobId, resultPartitionDeploymentDescriptor, taskExecutor, taskExecutorGateway, promotePartitionsFuture) -> {
+			partitionTracker -> partitionTracker.setPromotePartitionsConsumer(promotePartitionsFuture::complete),
+			(jobId, resultPartitionDeploymentDescriptor, taskExecutor, taskExecutorGateway) -> {
 				final ResultPartitionID resultPartitionId = resultPartitionDeploymentDescriptor.getShuffleDescriptor().getResultPartitionID();
 
 				taskExecutorGateway.releaseOrPromotePartitions(jobId, Collections.emptySet(), Collections.singleton(resultPartitionId));
@@ -264,13 +255,10 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 
 	@Test
 	public void testClusterPartitionRelease() throws Exception {
+		final CompletableFuture<Collection<IntermediateDataSetID>> releasePartitionsFuture = new CompletableFuture<>();
 		testPartitionRelease(
-			partitionTracker -> {
-				final CompletableFuture<Collection<IntermediateDataSetID>> releasePartitionsFuture = new CompletableFuture<>();
-				partitionTracker.setReleaseClusterPartitionsConsumer(releasePartitionsFuture::complete);
-				return releasePartitionsFuture;
-			},
-			(jobId, resultPartitionDeploymentDescriptor, taskExecutor, taskExecutorGateway, releasePartitionsFuture) -> {
+			partitionTracker -> partitionTracker.setReleaseClusterPartitionsConsumer(releasePartitionsFuture::complete),
+			(jobId, resultPartitionDeploymentDescriptor, taskExecutor, taskExecutorGateway) -> {
 				final IntermediateDataSetID dataSetId = resultPartitionDeploymentDescriptor.getResultId();
 
 				taskExecutorGateway.releaseClusterPartitions(Collections.singleton(dataSetId), timeout);
@@ -341,7 +329,7 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 		final TestingTaskExecutorPartitionTracker partitionTracker = new TestingTaskExecutorPartitionTracker();
 		final CompletableFuture<ResultPartitionID> startTrackingFuture = new CompletableFuture<>();
 		partitionTracker.setStartTrackingPartitionsConsumer((jobId, partitionInfo) -> startTrackingFuture.complete(partitionInfo.getResultPartitionId()));
-		C partitionTrackerSetupResult = partitionTrackerSetup.accept(partitionTracker);
+		partitionTrackerSetup.accept(partitionTracker);
 
 		final TestingTaskExecutor taskExecutor = createTestingTaskExecutor(taskManagerServices, partitionTracker);
 
@@ -421,8 +409,7 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 				jobId,
 				taskResultPartitionDescriptor,
 				taskExecutor,
-				taskExecutorGateway,
-				partitionTrackerSetupResult);
+				taskExecutorGateway);
 		} finally {
 			RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
 		}
@@ -478,12 +465,12 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 	}
 
 	@FunctionalInterface
-	private interface PartitionTrackerSetup<C> {
-		C accept(TestingTaskExecutorPartitionTracker partitionTracker) throws Exception;
+	private interface PartitionTrackerSetup {
+		void accept(TestingTaskExecutorPartitionTracker partitionTracker) throws Exception;
 	}
 
 	@FunctionalInterface
-	private interface TestAction<C> {
-		void accept(JobID jobId, ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor, TaskExecutor taskExecutor, TaskExecutorGateway taskExecutorGateway, C partitionTrackerSetupResult) throws Exception;
+	private interface TestAction {
+		void accept(JobID jobId, ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor, TaskExecutor taskExecutor, TaskExecutorGateway taskExecutorGateway) throws Exception;
 	}
 }


[flink] 07/07: [FLINK-17558][netty] Release partitions asynchronously

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 79cb1a140991d13b2010d10c86e054fcced977c4
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Tue May 19 16:58:22 2020 +0200

    [FLINK-17558][netty] Release partitions asynchronously
---
 .../io/network/NettyShuffleEnvironment.java        | 15 ++++--
 .../io/network/NettyShuffleServiceFactory.java     | 31 ++++++++++--
 .../runtime/shuffle/ShuffleEnvironmentContext.java | 11 ++++-
 .../runtime/taskexecutor/TaskManagerRunner.java    |  7 ++-
 .../runtime/taskexecutor/TaskManagerServices.java  | 19 ++++----
 .../TaskManagerServicesConfiguration.java          | 18 +++++--
 .../io/network/NettyShuffleEnvironmentBuilder.java | 21 ++++++++-
 .../io/network/NettyShuffleEnvironmentTest.java    | 26 ++++++++++
 .../TaskExecutorLocalStateStoresManagerTest.java   |  2 +-
 .../TaskExecutorPartitionLifecycleTest.java        | 55 ++++++++++++++++++++++
 10 files changed, 179 insertions(+), 26 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
index 8938f5e..61107ba 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
@@ -57,6 +57,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
 
 import static org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.METRIC_GROUP_INPUT;
 import static org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.METRIC_GROUP_OUTPUT;
@@ -95,6 +96,8 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment<ResultPartiti
 
 	private final SingleInputGateFactory singleInputGateFactory;
 
+	private final Executor ioExecutor;
+
 	private boolean isClosed;
 
 	NettyShuffleEnvironment(
@@ -105,7 +108,8 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment<ResultPartiti
 			ResultPartitionManager resultPartitionManager,
 			FileChannelManager fileChannelManager,
 			ResultPartitionFactory resultPartitionFactory,
-			SingleInputGateFactory singleInputGateFactory) {
+			SingleInputGateFactory singleInputGateFactory,
+			Executor ioExecutor) {
 		this.taskExecutorResourceId = taskExecutorResourceId;
 		this.config = config;
 		this.networkBufferPool = networkBufferPool;
@@ -115,6 +119,7 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment<ResultPartiti
 		this.fileChannelManager = fileChannelManager;
 		this.resultPartitionFactory = resultPartitionFactory;
 		this.singleInputGateFactory = singleInputGateFactory;
+		this.ioExecutor = ioExecutor;
 		this.isClosed = false;
 	}
 
@@ -149,9 +154,11 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment<ResultPartiti
 
 	@Override
 	public void releasePartitionsLocally(Collection<ResultPartitionID> partitionIds) {
-		for (ResultPartitionID partitionId : partitionIds) {
-			resultPartitionManager.releasePartition(partitionId, null);
-		}
+		ioExecutor.execute(() -> {
+			for (ResultPartitionID partitionId : partitionIds) {
+				resultPartitionManager.releasePartition(partitionId, null);
+			}
+		});
 	}
 
 	/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
index 9c4f95b..87ce8cd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
@@ -38,6 +38,8 @@ import org.apache.flink.runtime.shuffle.ShuffleEnvironmentContext;
 import org.apache.flink.runtime.shuffle.ShuffleServiceFactory;
 import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
 
+import java.util.concurrent.Executor;
+
 import static org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.registerShuffleMetrics;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -65,7 +67,8 @@ public class NettyShuffleServiceFactory implements ShuffleServiceFactory<NettySh
 			networkConfig,
 			shuffleEnvironmentContext.getTaskExecutorResourceId(),
 			shuffleEnvironmentContext.getEventPublisher(),
-			shuffleEnvironmentContext.getParentMetricGroup());
+			shuffleEnvironmentContext.getParentMetricGroup(),
+			shuffleEnvironmentContext.getIoExecutor());
 	}
 
 	@VisibleForTesting
@@ -73,16 +76,33 @@ public class NettyShuffleServiceFactory implements ShuffleServiceFactory<NettySh
 			NettyShuffleEnvironmentConfiguration config,
 			ResourceID taskExecutorResourceId,
 			TaskEventPublisher taskEventPublisher,
-			MetricGroup metricGroup) {
+			MetricGroup metricGroup,
+			Executor ioExecutor) {
+		return createNettyShuffleEnvironment(
+			config,
+			taskExecutorResourceId,
+			taskEventPublisher,
+			new ResultPartitionManager(),
+			metricGroup,
+			ioExecutor);
+	}
+
+	@VisibleForTesting
+	static NettyShuffleEnvironment createNettyShuffleEnvironment(
+			NettyShuffleEnvironmentConfiguration config,
+			ResourceID taskExecutorResourceId,
+			TaskEventPublisher taskEventPublisher,
+			ResultPartitionManager resultPartitionManager,
+			MetricGroup metricGroup,
+			Executor ioExecutor) {
 		checkNotNull(config);
 		checkNotNull(taskExecutorResourceId);
 		checkNotNull(taskEventPublisher);
+		checkNotNull(resultPartitionManager);
 		checkNotNull(metricGroup);
 
 		NettyConfig nettyConfig = config.nettyConfig();
 
-		ResultPartitionManager resultPartitionManager = new ResultPartitionManager();
-
 		FileChannelManager fileChannelManager = new FileChannelManagerImpl(config.getTempDirs(), DIR_NAME_PREFIX);
 
 		ConnectionManager connectionManager = nettyConfig != null ?
@@ -126,6 +146,7 @@ public class NettyShuffleServiceFactory implements ShuffleServiceFactory<NettySh
 			resultPartitionManager,
 			fileChannelManager,
 			resultPartitionFactory,
-			singleInputGateFactory);
+			singleInputGateFactory,
+			ioExecutor);
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java
index 7863f18..3116372 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.io.network.TaskEventPublisher;
 
 import java.net.InetAddress;
+import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -40,6 +41,8 @@ public class ShuffleEnvironmentContext {
 	private final TaskEventPublisher eventPublisher;
 	private final MetricGroup parentMetricGroup;
 
+	private final Executor ioExecutor;
+
 	public ShuffleEnvironmentContext(
 			Configuration configuration,
 			ResourceID taskExecutorResourceId,
@@ -47,7 +50,8 @@ public class ShuffleEnvironmentContext {
 			boolean localCommunicationOnly,
 			InetAddress hostAddress,
 			TaskEventPublisher eventPublisher,
-			MetricGroup parentMetricGroup) {
+			MetricGroup parentMetricGroup,
+			Executor ioExecutor) {
 		this.configuration = checkNotNull(configuration);
 		this.taskExecutorResourceId = checkNotNull(taskExecutorResourceId);
 		this.networkMemorySize = networkMemorySize;
@@ -55,6 +59,7 @@ public class ShuffleEnvironmentContext {
 		this.hostAddress = checkNotNull(hostAddress);
 		this.eventPublisher = checkNotNull(eventPublisher);
 		this.parentMetricGroup = checkNotNull(parentMetricGroup);
+		this.ioExecutor = ioExecutor;
 	}
 
 	public Configuration getConfiguration() {
@@ -84,4 +89,8 @@ public class ShuffleEnvironmentContext {
 	public MetricGroup getParentMetricGroup() {
 		return parentMetricGroup;
 	}
+
+	public Executor getIoExecutor() {
+		return ioExecutor;
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 88786bf..9772700 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -30,6 +30,7 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobCacheService;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.entrypoint.FlinkParseException;
@@ -366,11 +367,15 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync
 			resourceID,
 			taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval());
 
+		final ExecutorService ioExecutor = Executors.newCachedThreadPool(
+			taskManagerServicesConfiguration.getNumIoThreads(),
+			new ExecutorThreadFactory("flink-taskexecutor-io"));
+
 		TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(
 			taskManagerServicesConfiguration,
 			blobCacheService.getPermanentBlobService(),
 			taskManagerMetricGroup.f1,
-			rpcService.getExecutor()); // TODO replace this later with some dedicated executor for io.
+			ioExecutor);
 
 		TaskManagerConfiguration taskManagerConfiguration =
 			TaskManagerConfiguration.fromConfiguration(configuration, taskExecutorResourceSpec, externalAddress);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index ee737c1..d6aa2c4 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -38,7 +38,6 @@ import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl;
 import org.apache.flink.runtime.taskexecutor.slot.TimerService;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
-import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
@@ -50,7 +49,6 @@ import java.io.File;
 import java.io.IOException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 /**
@@ -247,7 +245,7 @@ public class TaskManagerServices {
 	 * @param taskManagerServicesConfiguration task manager configuration
 	 * @param permanentBlobService permanentBlobService used by the services
 	 * @param taskManagerMetricGroup metric group of the task manager
-	 * @param taskIOExecutor executor for async IO operations
+	 * @param ioExecutor executor for async IO operations
 	 * @return task manager components
 	 * @throws Exception
 	 */
@@ -255,7 +253,7 @@ public class TaskManagerServices {
 			TaskManagerServicesConfiguration taskManagerServicesConfiguration,
 			PermanentBlobService permanentBlobService,
 			MetricGroup taskManagerMetricGroup,
-			Executor taskIOExecutor) throws Exception {
+			ExecutorService ioExecutor) throws Exception {
 
 		// pre-start checks
 		checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths());
@@ -268,7 +266,8 @@ public class TaskManagerServices {
 		final ShuffleEnvironment<?, ?> shuffleEnvironment = createShuffleEnvironment(
 			taskManagerServicesConfiguration,
 			taskEventDispatcher,
-			taskManagerMetricGroup);
+			taskManagerMetricGroup,
+			ioExecutor);
 		final int listeningDataPort = shuffleEnvironment.start();
 
 		final KvStateService kvStateService = KvStateService.fromConfiguration(taskManagerServicesConfiguration);
@@ -306,9 +305,7 @@ public class TaskManagerServices {
 		final TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager(
 			taskManagerServicesConfiguration.isLocalRecoveryEnabled(),
 			stateRootDirectoryFiles,
-			taskIOExecutor);
-
-		final ExecutorService ioExecutor = Executors.newSingleThreadExecutor(new ExecutorThreadFactory("taskexecutor-io"));
+			ioExecutor);
 
 		final LibraryCacheManager libraryCacheManager = new BlobLibraryCacheManager(
 			permanentBlobService,
@@ -351,7 +348,8 @@ public class TaskManagerServices {
 	private static ShuffleEnvironment<?, ?> createShuffleEnvironment(
 			TaskManagerServicesConfiguration taskManagerServicesConfiguration,
 			TaskEventDispatcher taskEventDispatcher,
-			MetricGroup taskManagerMetricGroup) throws FlinkException {
+			MetricGroup taskManagerMetricGroup,
+			Executor ioExecutor) throws FlinkException {
 
 		final ShuffleEnvironmentContext shuffleEnvironmentContext = new ShuffleEnvironmentContext(
 			taskManagerServicesConfiguration.getConfiguration(),
@@ -360,7 +358,8 @@ public class TaskManagerServices {
 			taskManagerServicesConfiguration.isLocalCommunicationOnly(),
 			taskManagerServicesConfiguration.getBindAddress(),
 			taskEventDispatcher,
-			taskManagerMetricGroup);
+			taskManagerMetricGroup,
+			ioExecutor);
 
 		return ShuffleServiceLoader
 			.loadShuffleServiceFactory(taskManagerServicesConfiguration.getConfiguration())
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index bb50b62..5d126ae 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration;
+import org.apache.flink.runtime.util.ClusterEntrypointUtils;
 import org.apache.flink.runtime.util.ConfigurationParserUtils;
 import org.apache.flink.util.NetUtils;
 
@@ -84,7 +85,9 @@ public class TaskManagerServicesConfiguration {
 
 	private final String[] alwaysParentFirstLoaderPatterns;
 
-	public TaskManagerServicesConfiguration(
+	private final int numIoThreads;
+
+	private TaskManagerServicesConfiguration(
 			Configuration configuration,
 			ResourceID resourceID,
 			String externalAddress,
@@ -102,7 +105,8 @@ public class TaskManagerServicesConfiguration {
 			RetryingRegistrationConfiguration retryingRegistrationConfiguration,
 			Optional<Time> systemResourceMetricsProbingInterval,
 			FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder,
-			String[] alwaysParentFirstLoaderPatterns) {
+			String[] alwaysParentFirstLoaderPatterns,
+			int numIoThreads) {
 		this.configuration = checkNotNull(configuration);
 		this.resourceID = checkNotNull(resourceID);
 
@@ -121,6 +125,7 @@ public class TaskManagerServicesConfiguration {
 		this.taskExecutorResourceSpec = taskExecutorResourceSpec;
 		this.classLoaderResolveOrder = classLoaderResolveOrder;
 		this.alwaysParentFirstLoaderPatterns = alwaysParentFirstLoaderPatterns;
+		this.numIoThreads = numIoThreads;
 
 		checkArgument(timerServiceShutdownTimeout >= 0L, "The timer " +
 			"service shutdown timeout must be greater or equal to 0.");
@@ -215,6 +220,10 @@ public class TaskManagerServicesConfiguration {
 		return alwaysParentFirstLoaderPatterns;
 	}
 
+	public int getNumIoThreads() {
+		return numIoThreads;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Parsing of Flink configuration
 	// --------------------------------------------------------------------------------------------
@@ -262,6 +271,8 @@ public class TaskManagerServicesConfiguration {
 
 		final String[] alwaysParentFirstLoaderPatterns = CoreOptions.getParentFirstLoaderPatterns(configuration);
 
+		final int numIoThreads = ClusterEntrypointUtils.getPoolSize(configuration);
+
 		return new TaskManagerServicesConfiguration(
 			configuration,
 			resourceID,
@@ -280,6 +291,7 @@ public class TaskManagerServicesConfiguration {
 			retryingRegistrationConfiguration,
 			ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration),
 			FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder),
-			alwaysParentFirstLoaderPatterns);
+			alwaysParentFirstLoaderPatterns,
+			numIoThreads);
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
index 5c377d2..ab999fc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
@@ -20,13 +20,16 @@ package org.apache.flink.runtime.io.network;
 
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionType;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 
 import java.time.Duration;
+import java.util.concurrent.Executor;
 
 /**
  * Builder for the {@link NettyShuffleEnvironment}.
@@ -63,6 +66,10 @@ public class NettyShuffleEnvironmentBuilder {
 
 	private MetricGroup metricGroup = UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup();
 
+	private ResultPartitionManager resultPartitionManager = new ResultPartitionManager();
+
+	private Executor ioExecutor = Executors.directExecutor();
+
 	public NettyShuffleEnvironmentBuilder setTaskManagerLocation(ResourceID taskManagerLocation) {
 		this.taskManagerLocation = taskManagerLocation;
 		return this;
@@ -123,6 +130,16 @@ public class NettyShuffleEnvironmentBuilder {
 		return this;
 	}
 
+	public NettyShuffleEnvironmentBuilder setResultPartitionManager(ResultPartitionManager resultPartitionManager) {
+		this.resultPartitionManager = resultPartitionManager;
+		return this;
+	}
+
+	public NettyShuffleEnvironmentBuilder setIoExecutor(Executor ioExecutor) {
+		this.ioExecutor = ioExecutor;
+		return this;
+	}
+
 	public NettyShuffleEnvironment build() {
 		return NettyShuffleServiceFactory.createNettyShuffleEnvironment(
 			new NettyShuffleEnvironmentConfiguration(
@@ -143,6 +160,8 @@ public class NettyShuffleEnvironmentBuilder {
 				maxBuffersPerChannel),
 			taskManagerLocation,
 			new TaskEventDispatcher(),
-			metricGroup);
+			resultPartitionManager,
+			metricGroup,
+			ioExecutor);
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
index 826f15c..62aba7e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
@@ -19,10 +19,13 @@
 package org.apache.flink.runtime.io.network;
 
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.core.testutils.BlockerSync;
 import org.apache.flink.runtime.io.disk.FileChannelManager;
 import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
@@ -41,6 +44,8 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.Executors;
 
 import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createDummyConnectionManager;
 import static org.apache.flink.runtime.io.network.partition.PartitionTestUtils.createPartition;
@@ -100,6 +105,27 @@ public class NettyShuffleEnvironmentTest extends TestLogger {
 		testRegisterTaskWithLimitedBuffers(bufferCount);
 	}
 
+	@Test
+	public void testSlowIODoesNotBlockRelease() throws Exception {
+		BlockerSync sync = new BlockerSync();
+		ResultPartitionManager blockingResultPartitionManager = new ResultPartitionManager() {
+			@Override
+			public void releasePartition(ResultPartitionID partitionId, Throwable cause) {
+				sync.blockNonInterruptible();
+				super.releasePartition(partitionId, cause);
+			}
+		};
+
+		NettyShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder()
+			.setResultPartitionManager(blockingResultPartitionManager)
+			.setIoExecutor(Executors.newFixedThreadPool(1))
+			.build();
+
+		shuffleEnvironment.releasePartitionsLocally(Collections.singleton(new ResultPartitionID()));
+		sync.awaitBlocker();
+		sync.releaseBlocker();
+	}
+
 	private void testRegisterTaskWithLimitedBuffers(int bufferPoolSize) throws Exception {
 		final NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder()
 			.setNumNetworkBuffers(bufferPoolSize)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
index 0560184..958b92c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
@@ -227,6 +227,6 @@ public class TaskExecutorLocalStateStoresManagerTest extends TestLogger {
 			config,
 			VoidPermanentBlobService.INSTANCE,
 			UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-			Executors.directExecutor());
+			Executors.newDirectExecutorService());
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
index b538b36..9a3afa1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
@@ -38,11 +38,15 @@ import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
 import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionInfo;
 import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTracker;
+import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTrackerImpl;
 import org.apache.flink.runtime.io.network.partition.TestingTaskExecutorPartitionTracker;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -63,6 +67,7 @@ import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils;
 import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.testutils.executor.TestExecutorResource;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.function.TriConsumer;
@@ -71,6 +76,7 @@ import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -84,6 +90,7 @@ import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.StreamSupport;
 
@@ -110,6 +117,10 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 	@Rule
 	public final TemporaryFolder tmp = new TemporaryFolder();
 
+	@ClassRule
+	public static final TestExecutorResource TEST_EXECUTOR_SERVICE_RESOURCE =
+		new TestExecutorResource(() -> java.util.concurrent.Executors.newFixedThreadPool(1));
+
 	@Before
 	public void setup() {
 		haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetriever);
@@ -268,6 +279,50 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 		);
 	}
 
+	@Test
+	public void testBlockingLocalPartitionReleaseDoesNotBlockTaskExecutor() throws Exception {
+		BlockerSync sync = new BlockerSync();
+		ResultPartitionManager blockingResultPartitionManager = new ResultPartitionManager() {
+			@Override
+			public void releasePartition(ResultPartitionID partitionId, Throwable cause) {
+				sync.blockNonInterruptible();
+				super.releasePartition(partitionId, cause);
+			}
+		};
+
+		NettyShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder()
+			.setResultPartitionManager(blockingResultPartitionManager)
+			.setIoExecutor(TEST_EXECUTOR_SERVICE_RESOURCE.getExecutor())
+			.build();
+
+		final CompletableFuture<ResultPartitionID> startTrackingFuture = new CompletableFuture<>();
+		final TaskExecutorPartitionTracker partitionTracker = new TaskExecutorPartitionTrackerImpl(shuffleEnvironment) {
+			@Override
+			public void startTrackingPartition(JobID producingJobId, TaskExecutorPartitionInfo partitionInfo) {
+				super.startTrackingPartition(producingJobId, partitionInfo);
+				startTrackingFuture.complete(partitionInfo.getResultPartitionId());
+			}
+		};
+
+		try {
+			internalTestPartitionRelease(
+				partitionTracker,
+				shuffleEnvironment,
+				startTrackingFuture,
+				(jobId, resultPartitionDeploymentDescriptor, taskExecutor, taskExecutorGateway) -> {
+					final IntermediateDataSetID dataSetId = resultPartitionDeploymentDescriptor.getResultId();
+
+					taskExecutorGateway.releaseClusterPartitions(Collections.singleton(dataSetId), timeout);
+
+					// execute some operation to check whether the TaskExecutor is blocked
+					taskExecutorGateway.canBeReleased().get(5, TimeUnit.SECONDS);
+				}
+			);
+		} finally {
+			sync.releaseBlocker();
+		}
+	}
+
 	private void testPartitionRelease(PartitionTrackerSetup partitionTrackerSetup, TestAction testAction) throws Exception {
 		final TestingTaskExecutorPartitionTracker partitionTracker = new TestingTaskExecutorPartitionTracker();
 		final CompletableFuture<ResultPartitionID> startTrackingFuture = new CompletableFuture<>();