You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/20 14:17:30 UTC

[GitHub] [flink] tillrohrmann commented on a change in pull request #12264: [FLINK-17558][netty] Release partitions asynchronously

tillrohrmann commented on a change in pull request #12264:
URL: https://github.com/apache/flink/pull/12264#discussion_r428047035



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
##########
@@ -280,7 +273,65 @@ public void testClusterPartitionRelease() throws Exception {
 		);
 	}
 
-	private <C> void testPartitionRelease(PartitionTrackerSetup<C> partitionTrackerSetup, TestAction<C> testAction) throws Exception {
+	@Test
+	public void testBlockingLocalPartitionReleaseDoesNotBlockTaskExecutor() throws Exception {
+		BlockerSync sync = new BlockerSync();
+		ResultPartitionManager blockingResultPartitionManager = new ResultPartitionManager() {
+			@Override
+			public void releasePartition(ResultPartitionID partitionId, Throwable cause) {
+				sync.blockNonInterruptible();
+				super.releasePartition(partitionId, cause);
+			}
+		};
+
+		NettyShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder()
+			.setResultPartitionManager(blockingResultPartitionManager)
+			.setIoExecutor(java.util.concurrent.Executors.newFixedThreadPool(1))

Review comment:
       I would suggest to also shut this executor service down at the end of the test. It might be necessary to unblock the release operation for this.

##########
File path: flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
##########
@@ -490,6 +490,13 @@
 				+ " size will be used. The exact size of JVM Overhead can be explicitly specified by setting the min/max"
 				+ " size to the same value.");
 
+	@Documentation.ExcludeFromDocumentation("This option just serves as a last-ditch escape hatch.")
+	public static final ConfigOption<Integer> NUM_IO_THREADS =
+		key("taskmanager.io.threads.num")
+			.intType()
+			.defaultValue(2)
+			.withDescription("The number of threads to use for non-critical IO operations.");

Review comment:
       We might be able to unify this configuration option with `ClusterOptions.CLUSTER_IO_EXECUTOR_POOL_SIZE`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
##########
@@ -265,10 +265,15 @@ public static TaskManagerServices fromConfiguration(
 		// start the I/O manager, it will create some temp directories.
 		final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());
 
+		final ExecutorService ioExecutor = Executors.newFixedThreadPool(

Review comment:
       Can the `ioExecutor` also replace the `taskIOExecutor`?

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
##########
@@ -100,6 +105,27 @@ public void testRegisterTaskWithInsufficientBuffers() throws Exception {
 		testRegisterTaskWithLimitedBuffers(bufferCount);
 	}
 
+	@Test
+	public void testSlowIODoesNotBlockRelease() throws Exception {
+		BlockerSync sync = new BlockerSync();

Review comment:
       I guess a `OneShotLatch` would also work here if the test threads call the trigger on it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org